Changeset 8decff7 for test


Ignore:
Timestamp:
09/16/15 13:19:13 (5 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
1101175
Parents:
5478d3d
Message:

Update all parallel tests so they compile and run

Location:
test
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • test/test-format-parallel-hasher.c

    r6b98325 r8decff7  
    9494        bool seen_start_message;
    9595        bool seen_stop_message;
    96         bool seen_resumed_message;
     96        bool seen_resuming_message;
    9797        bool seen_pausing_message;
    9898        int count;
    9999};
    100100
    101 static int totalpkts = 0;
    102 static int expected;
    103 static void report_result(libtrace_t *trace UNUSED, int mesg,
    104                           libtrace_generic_t data,
    105                           libtrace_thread_t *sender UNUSED) {
    106         static int totalthreads = 0;
    107         switch (mesg) {
    108         case MESSAGE_RESULT:
    109                 assert(data.res->key == 0);
    110                 printf("%d,", data.res->value.sint);
    111                 totalthreads++;
    112                 totalpkts += data.res->value.sint;
    113                 assert(data.res->value.sint == 25 ||
    114                        data.res->value.sint == expected - 25);
    115                 break;
    116         case MESSAGE_STARTING:
    117                 // Should have two threads here
    118                 assert(libtrace_get_perpkt_count(trace) == 2);
    119                 printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
    120                 break;
    121         case MESSAGE_STOPPING:
    122                 printf(")\n");
    123                 assert(totalthreads == libtrace_get_perpkt_count(trace));
    124                 break;
     101struct final {
     102        int threads;
     103        int packets;
     104};
     105
     106static void *report_start(libtrace_t *trace UNUSED,
     107                libtrace_thread_t *t UNUSED,
     108                void *global) {
     109        uint32_t *magic = (uint32_t *)global;
     110        struct final *threadcounter =
     111                        (struct final *)malloc(sizeof(struct final));
     112
     113        assert(*magic == 0xabcdef);
     114
     115        threadcounter->threads = 0;
     116        threadcounter->packets = 0;
     117        return threadcounter;
     118}
     119
     120static void report_cb(libtrace_t *trace UNUSED,
     121                libtrace_thread_t *sender UNUSED,
     122                void *global, void *tls, libtrace_result_t *res) {
     123
     124        uint32_t *magic = (uint32_t *)global;
     125        struct final *threadcounter = (struct final *)tls;
     126
     127        assert(*magic == 0xabcdef);
     128        assert(res->key == 0);
     129
     130        threadcounter->threads ++;
     131        threadcounter->packets += res->value.sint;
     132
     133        assert(res->value.sint == 25 || res->value.sint == 75);
     134        printf("%d\n", res->value.sint);
     135}
     136
     137static void report_end(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     138                void *global, void *tls) {
     139
     140        uint32_t *magic = (uint32_t *)global;
     141        struct final *threadcounter = (struct final *)tls;
     142
     143        assert(*magic == 0xabcdef);
     144        assert(threadcounter->threads == trace_get_perpkt_threads(trace));
     145        assert(threadcounter->packets == 100);
     146
     147        free(threadcounter);
     148}
     149
     150static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED,
     151                libtrace_thread_t *t UNUSED,
     152                void *global, void *tls, libtrace_packet_t *packet) {
     153        struct TLS *storage = (struct TLS *)tls;
     154        uint32_t *magic = (uint32_t *)global;
     155        static __thread int count = 0;
     156        int a,*b,c=0;
     157
     158        assert(storage != NULL);
     159        assert(!storage->seen_stop_message);
     160
     161        if (storage->seen_pausing_message)
     162                assert(storage->seen_resuming_message);
     163
     164        assert(*magic == 0xabcdef);
     165
     166        storage->count ++;
     167        count ++;
     168
     169        assert(count == storage->count);
     170
     171        if (count > 100) {
     172                fprintf(stderr, "Too many packets -- someone should stop me!\n");
     173                kill(getpid(), SIGTERM);
     174        }
     175
     176        // Do some work to even out the load on cores
     177        b = &c;
     178        for (a = 0; a < 10000000; a++) {
     179                c += a**b;
     180        }
     181
     182        return packet;
     183}
     184
     185static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     186                void *global) {
     187
     188        static __thread bool seen_start_message = false;
     189        uint32_t *magic = (uint32_t *)global;
     190        struct TLS *storage = NULL;
     191        assert(*magic == 0xabcdef);
     192
     193        assert(!seen_start_message);
     194        assert(trace);
     195
     196        storage = (struct TLS *)malloc(sizeof(struct TLS));
     197        storage->seen_start_message = true;
     198        storage->seen_stop_message = false;
     199        storage->seen_resuming_message = false;
     200        storage->seen_pausing_message = false;
     201        storage->count = 0;
     202
     203        seen_start_message = true;
     204
     205        return storage;
     206}
     207
     208static void stop_processing(libtrace_t *trace, libtrace_thread_t *t,
     209                void *global, void *tls) {
     210
     211        static __thread bool seen_stop_message = false;
     212        struct TLS *storage = (struct TLS *)tls;
     213        uint32_t *magic = (uint32_t *)global;
     214
     215        assert(storage != NULL);
     216        assert(!storage->seen_stop_message);
     217        assert(!seen_stop_message);
     218        assert(storage->seen_start_message);
     219        assert(*magic == 0xabcdef);
     220
     221        seen_stop_message = true;
     222        storage->seen_stop_message = true;
     223
     224        assert(storage->count == 25 || storage->count == 75);
     225
     226        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = storage->count}, RESULT_USER);
     227        trace_post_reporter(trace);
     228        free(storage);
     229}
     230
     231static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     232                void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) {
     233
     234        fprintf(stderr, "Not expecting a tick packet\n");
     235        kill(getpid(), SIGTERM);
     236}
     237
     238static void pause_processing(libtrace_t *trace UNUSED,
     239                libtrace_thread_t *t UNUSED,
     240                void *global, void *tls) {
     241
     242        static __thread bool seen_pause_message = false;
     243        struct TLS *storage = (struct TLS *)tls;
     244        uint32_t *magic = (uint32_t *)global;
     245
     246        assert(storage != NULL);
     247        assert(!storage->seen_stop_message);
     248        assert(storage->seen_start_message);
     249        assert(*magic == 0xabcdef);
     250
     251        assert(seen_pause_message == storage->seen_pausing_message);
     252
     253        seen_pause_message = true;
     254        storage->seen_pausing_message = true;
     255}
     256
     257static void resume_processing(libtrace_t *trace UNUSED,
     258                libtrace_thread_t *t UNUSED,
     259                void *global, void *tls) {
     260
     261        static __thread bool seen_resume_message = false;
     262        struct TLS *storage = (struct TLS *)tls;
     263        uint32_t *magic = (uint32_t *)global;
     264
     265        assert(storage != NULL);
     266        assert(!storage->seen_stop_message);
     267        assert(storage->seen_start_message);
     268        assert(*magic == 0xabcdef);
     269
     270        assert(seen_resume_message == storage->seen_resuming_message);
     271
     272        seen_resume_message = true;
     273        storage->seen_resuming_message = true;
     274}
     275
     276uint64_t custom_hash(const libtrace_packet_t *packet UNUSED, void *data) {
     277        int *count = (int *)data;
     278        *count += 1;
     279
     280        /* Just throw the first 25 packets to thread 0 and the rest to thread
     281         * 1.
     282         */
     283        if (*count <= 25)
     284                return 0;
     285        return 1;
     286}
     287
     288int main(int argc, char *argv[]) {
     289        int error = 0;
     290        const char *tracename;
     291        libtrace_t *trace;
     292        libtrace_callback_set_t *processing = NULL;
     293        libtrace_callback_set_t *reporter = NULL;
     294        uint32_t global = 0xabcdef;
     295        int hashercount = 0;
     296
     297        if (argc<2) {
     298                fprintf(stderr,"usage: %s type\n",argv[0]);
     299                return 1;
    125300        }
    126 }
    127 
    128 static int x;
    129 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    130                         int mesg, libtrace_generic_t data,
    131                         libtrace_thread_t *sender UNUSED) {
    132         struct TLS *tls;
    133         void* ret;
    134         tls = trace_get_tls(t);
    135         int a,*b,c=0;
    136 
    137         switch (mesg) {
    138         case MESSAGE_PACKET:
    139                 assert(tls != NULL);
    140                 assert(!(tls->seen_stop_message));
    141                 tls->count++;
    142                 if (tls->count>100) {
    143                         fprintf(stderr, "Too many packets someone should stop me!!\n");
    144                         kill(getpid(), SIGTERM);
    145                 }
    146                 // Do some work to even out the load on cores
    147                 b = &c;
    148                 for (a = 0; a < 10000000; a++) {
    149                         c += a**b;
    150                 }
    151                 x = c;
    152                 return data.pkt;
    153         case MESSAGE_STARTING:
    154                 assert(tls == NULL);
    155                 tls = calloc(sizeof(struct TLS), 1);
    156                 ret = trace_set_tls(t, tls);
    157                 assert(ret == NULL);
    158                 tls->seen_start_message = true;
    159                 break;
    160         case MESSAGE_STOPPING:
    161                 assert(tls->seen_start_message);
    162                 assert(tls != NULL);
    163                 tls->seen_stop_message = true;
    164                 trace_set_tls(t, NULL);
    165 
    166                 // All threads publish to verify the thread count
    167                 assert(tls->count == 25 || tls->count == 75);
    168                 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_USER);
    169                 trace_post_reporter(trace);
    170                 free(tls);
    171                 break;
    172         case MESSAGE_TICK_INTERVAL:
    173         case MESSAGE_TICK_COUNT:
    174                 assert(tls->seen_start_message );
    175                 fprintf(stderr, "Not expecting a tick packet\n");
    176                 kill(getpid(), SIGTERM);
    177                 break;
    178         case MESSAGE_PAUSING:
    179                 assert(tls->seen_start_message);
    180                 tls->seen_pausing_message = true;
    181                 break;
    182         case MESSAGE_RESUMING:
    183                 assert(tls->seen_pausing_message  || tls->seen_start_message);
    184                 tls->seen_resumed_message = true;
    185                 break;
    186         }
    187         return NULL;
    188 }
    189 
    190 
    191 /**
    192  * Sends the first 25 packets to thread 0, the next 75 to thread 1
    193  * This is based on a few internal workings assumptions, which
    194  * might change and still be valid even if this test fails!!.
    195  */
    196 uint64_t hash25_75(const libtrace_packet_t* packet UNUSED, void *data) {
    197         int *count = (int *) data;
    198         *count += 1;
    199         if (*count <= 25)
    200                 return 0;
    201         return 1;
    202 }
    203 
    204 /**
    205  * Test that the hasher function works
    206  */
    207 int test_hasher(const char *tracename) {
    208         libtrace_t *trace;
    209         int error = 0;
    210         int hashercount = 0;
    211         printf("Testing hasher function\n");
    212 
    213         // Create the trace
     301
     302        tracename = lookup_uri(argv[1]);
     303
    214304        trace = trace_create(tracename);
    215305        iferr(trace,tracename);
    216306
    217         // Always use 2 threads for simplicity
    218         trace_set_perpkt_threads(trace, 2);
    219         trace_set_hasher(trace, HASHER_CUSTOM, &hash25_75, &hashercount);
    220 
    221         // Start it
    222         trace_pstart(trace, NULL, per_packet, report_result);
     307        processing = trace_create_callback_set();
     308        trace_set_starting_cb(processing, start_processing);
     309        trace_set_stopping_cb(processing, stop_processing);
     310        trace_set_packet_cb(processing, per_packet);
     311        trace_set_pausing_cb(processing, pause_processing);
     312        trace_set_resuming_cb(processing, resume_processing);
     313        trace_set_tick_count_cb(processing, process_tick);
     314        trace_set_tick_interval_cb(processing, process_tick);
     315
     316        reporter = trace_create_callback_set();
     317        trace_set_starting_cb(reporter, report_start);
     318        trace_set_stopping_cb(reporter, report_end);
     319        trace_set_result_cb(reporter, report_cb);
     320
     321
     322        /* Set up our hasher and our two threads */
     323        trace_set_perpkt_threads(trace, 2);
     324        trace_set_hasher(trace, HASHER_CUSTOM, &custom_hash, &hashercount);
     325
     326        trace_pstart(trace, &global, processing, reporter);
    223327        iferr(trace,tracename);
    224         /* Make sure traces survive a pause and restart */
     328
     329        /* Make sure traces survive a pause */
    225330        trace_ppause(trace);
    226331        iferr(trace,tracename);
     
    231336        trace_join(trace);
    232337
     338        global = 0xffffffff;
     339
    233340        /* Now check we have all received all the packets */
    234         if (error == 0) {
    235                 if (totalpkts == expected) {
    236                         printf("success: %d packets read\n",expected);
    237                 } else {
    238                         printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
    239                         error = 1;
    240                 }
    241         } else {
     341        if (error != 0) {
    242342                iferr(trace,tracename);
    243343        }
    244     trace_destroy(trace);
    245     return error;
    246 }
    247 
    248 
    249 
    250 int main(int argc, char *argv[]) {
    251         int error = 0;
    252         const char *tracename;
    253         expected = 100;
    254 
    255         if (argc<2) {
    256                 fprintf(stderr,"usage: %s type\n",argv[0]);
    257                 return 1;
    258         }
    259 
    260         tracename = lookup_uri(argv[1]);
    261 
    262         if (strcmp(argv[1],"rtclient")==0) expected=101;
    263 
    264         error = test_hasher(tracename);
    265 
    266     return error;
    267 }
     344
     345        trace_destroy(trace);
     346        trace_destroy_callback_set(processing);
     347        trace_destroy_callback_set(reporter);
     348        return error;
     349}
  • test/test-format-parallel-reporter.c

    r7c95027 r8decff7  
    5050#include "dagformat.h"
    5151#include "libtrace_parallel.h"
     52#include "data-struct/vector.h"
    5253
    5354void iferr(libtrace_t *trace,const char *msg)
     
    9091}
    9192
    92 int globalcount = 0;
    93 
    94 static void reporter(libtrace_t *libtrace UNUSED, int mesg,
    95                      libtrace_generic_t data,
    96                      libtrace_thread_t *sender UNUSED) {
    97         static uint64_t last = -1;
    98         static int pktcount = 0;
    99         libtrace_packet_t *packet;
    100         switch (mesg) {
    101         case MESSAGE_RESULT:
    102                 packet = data.res->value.pkt;
    103                 assert(data.res->key == trace_packet_get_order(packet));
    104                 if(last == (uint64_t)-1) {
    105                         last = data.res->key;
    106                 } else {
    107                         assert (last < data.res->key);
    108                         last = data.res->key;
    109                 }
    110                 pktcount++;
    111                 trace_free_packet(libtrace, packet);
    112                 break;
    113         case MESSAGE_STOPPING:
    114                 globalcount = pktcount;
    115                 break;
    116         }
    117 }
    118 
    119 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    120                         int mesg, libtrace_generic_t data,
    121                         libtrace_thread_t *sender UNUSED) {
    122         UNUSED static __thread int x = 0;
    123 
    124         if (mesg == MESSAGE_PACKET) {
    125                 int a,*b,c=0;
    126                 // Do some work to even out the load on cores
    127                 b = &c;
    128                 for (a = 0; a < 10000000; a++) {
    129                         c += a**b;
    130                 }
    131                 x = c;
    132                 trace_publish_result(trace, t, trace_packet_get_order(data.pkt), (libtrace_generic_t){.pkt=data.pkt}, RESULT_PACKET);
    133         }
    134         return NULL;
     93struct TLS {
     94        bool seen_start_message;
     95        bool seen_stop_message;
     96        bool seen_resuming_message;
     97        bool seen_pausing_message;
     98        int count;
     99};
     100
     101struct final {
     102        uint64_t last;
     103        int packets;
     104};
     105
     106static void *report_start(libtrace_t *trace UNUSED,
     107                libtrace_thread_t *t UNUSED,
     108                void *global) {
     109        uint32_t *magic = (uint32_t *)global;
     110        struct final *threadcounter =
     111                        (struct final *)malloc(sizeof(struct final));
     112
     113        assert(*magic == 0xabcdef);
     114
     115        threadcounter->last = 0;
     116        threadcounter->packets = 0;
     117        return threadcounter;
     118}
     119
     120static void report_cb(libtrace_t *trace UNUSED,
     121                libtrace_thread_t *sender UNUSED,
     122                void *global, void *tls, libtrace_result_t *res) {
     123
     124        uint32_t *magic = (uint32_t *)global;
     125        struct final *threadcounter = (struct final *)tls;
     126
     127        assert(*magic == 0xabcdef);
     128        assert(res->type == RESULT_PACKET);
     129     
     130        if (threadcounter->last != 0)
     131                assert(threadcounter->last + 1 == res->key);
     132        threadcounter->last = res->key;
     133
     134        threadcounter->packets += 1;
     135        trace_free_packet(trace, res->value.pkt);
     136}
     137
     138static void report_end(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     139                void *global, void *tls) {
     140
     141        uint32_t *magic = (uint32_t *)global;
     142        struct final *threadcounter = (struct final *)tls;
     143
     144        assert(*magic == 0xabcdef);
     145        assert(threadcounter->packets == 100);
     146
     147        free(threadcounter);
     148}
     149
     150static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED,
     151                libtrace_thread_t *t UNUSED,
     152                void *global, void *tls, libtrace_packet_t *packet) {
     153        struct TLS *storage = (struct TLS *)tls;
     154        uint32_t *magic = (uint32_t *)global;
     155        static __thread int count = 0;
     156        int a,*b,c=0;
     157
     158        assert(storage != NULL);
     159        assert(!storage->seen_stop_message);
     160
     161        if (storage->seen_pausing_message)
     162                assert(storage->seen_resuming_message);
     163
     164        assert(*magic == 0xabcdef);
     165
     166        storage->count ++;
     167        count ++;
     168
     169        assert(count == storage->count);
     170
     171        if (count > 100) {
     172                fprintf(stderr, "Too many packets -- someone should stop me!\n");
     173                kill(getpid(), SIGTERM);
     174        }
     175
     176        // Do some work to even out the load on cores
     177        b = &c;
     178        for (a = 0; a < 10000000; a++) {
     179                c += a**b;
     180        }
     181
     182        trace_publish_result(trace, t, trace_packet_get_order(packet),
     183                        (libtrace_generic_t){.pkt=packet}, RESULT_PACKET);
     184
     185        return NULL;
     186}
     187
     188static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     189                void *global) {
     190
     191        static __thread bool seen_start_message = false;
     192        uint32_t *magic = (uint32_t *)global;
     193        struct TLS *storage = NULL;
     194        assert(*magic == 0xabcdef);
     195
     196        assert(!seen_start_message);
     197        assert(trace);
     198
     199        storage = (struct TLS *)malloc(sizeof(struct TLS));
     200        storage->seen_start_message = true;
     201        storage->seen_stop_message = false;
     202        storage->seen_resuming_message = false;
     203        storage->seen_pausing_message = false;
     204        storage->count = 0;
     205
     206        seen_start_message = true;
     207
     208        return storage;
     209}
     210
     211static void stop_processing(libtrace_t *trace UNUSED,
     212                libtrace_thread_t *t UNUSED,
     213                void *global, void *tls) {
     214
     215        static __thread bool seen_stop_message = false;
     216        struct TLS *storage = (struct TLS *)tls;
     217        uint32_t *magic = (uint32_t *)global;
     218
     219        assert(storage != NULL);
     220        assert(!storage->seen_stop_message);
     221        assert(!seen_stop_message);
     222        assert(storage->seen_start_message);
     223        assert(*magic == 0xabcdef);
     224
     225        seen_stop_message = true;
     226        storage->seen_stop_message = true;
     227        free(storage);
     228}
     229
     230static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     231                void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) {
     232
     233        fprintf(stderr, "Not expecting a tick packet\n");
     234        kill(getpid(), SIGTERM);
     235}
     236
     237static void pause_processing(libtrace_t *trace UNUSED,
     238                libtrace_thread_t *t UNUSED,
     239                void *global, void *tls) {
     240
     241        static __thread bool seen_pause_message = false;
     242        struct TLS *storage = (struct TLS *)tls;
     243        uint32_t *magic = (uint32_t *)global;
     244
     245        assert(storage != NULL);
     246        assert(!storage->seen_stop_message);
     247        assert(storage->seen_start_message);
     248        assert(*magic == 0xabcdef);
     249
     250        assert(seen_pause_message == storage->seen_pausing_message);
     251
     252        seen_pause_message = true;
     253        storage->seen_pausing_message = true;
     254}
     255
     256static void resume_processing(libtrace_t *trace UNUSED,
     257                libtrace_thread_t *t UNUSED,
     258                void *global, void *tls) {
     259
     260        static __thread bool seen_resume_message = false;
     261        struct TLS *storage = (struct TLS *)tls;
     262        uint32_t *magic = (uint32_t *)global;
     263
     264        assert(storage != NULL);
     265        assert(!storage->seen_stop_message);
     266        assert(storage->seen_start_message);
     267        assert(*magic == 0xabcdef);
     268
     269        assert(seen_resume_message == storage->seen_resuming_message);
     270
     271        seen_resume_message = true;
     272        storage->seen_resuming_message = true;
    135273}
    136274
    137275int main(int argc, char *argv[]) {
    138276        int error = 0;
    139         int expected = 100;
    140277        const char *tracename;
    141278        libtrace_t *trace;
     279        libtrace_callback_set_t *processing = NULL;
     280        libtrace_callback_set_t *reporter = NULL;
     281        uint32_t global = 0xabcdef;
    142282
    143283        if (argc<2) {
     
    151291        iferr(trace,tracename);
    152292
    153         if (strcmp(argv[1],"rtclient")==0) expected=101;
    154 
    155         trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
    156 
    157         trace_pstart(trace, NULL, per_packet, reporter);
     293        processing = trace_create_callback_set();
     294        trace_set_starting_cb(processing, start_processing);
     295        trace_set_stopping_cb(processing, stop_processing);
     296        trace_set_packet_cb(processing, per_packet);
     297        trace_set_pausing_cb(processing, pause_processing);
     298        trace_set_resuming_cb(processing, resume_processing);
     299        trace_set_tick_count_cb(processing, process_tick);
     300        trace_set_tick_interval_cb(processing, process_tick);
     301
     302        reporter = trace_create_callback_set();
     303        trace_set_starting_cb(reporter, report_start);
     304        trace_set_stopping_cb(reporter, report_end);
     305        trace_set_result_cb(reporter, report_cb);
     306
     307
     308        /* Test ordered combiner */
     309        trace_set_perpkt_threads(trace, 4);
     310        trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0});
     311
     312        trace_pstart(trace, &global, processing, reporter);
    158313        iferr(trace,tracename);
    159314
     
    167322        trace_join(trace);
    168323
    169         if (error == 0) {
    170                 if (globalcount == expected) {
    171                         printf("success: %d packets read\n",expected);
    172                 } else {
    173                         printf("failure: %d packets expected, %d seen\n",expected,globalcount);
    174                         error = 1;
    175                 }
    176         } else {
     324        global = 0xffffffff;
     325
     326        /* Now check we have all received all the packets */
     327        if (error != 0) {
    177328                iferr(trace,tracename);
    178329        }
    179         trace_destroy(trace);
    180         return error;
    181 }
     330
     331        trace_destroy(trace);
     332        trace_destroy_callback_set(processing);
     333        trace_destroy_callback_set(reporter);
     334        return error;
     335}
  • test/test-format-parallel-singlethreaded-hasher.c

    r6b98325 r8decff7  
    9191}
    9292
    93 
    9493struct TLS {
    9594        bool seen_start_message;
     
    10099};
    101100
    102 static int totalpkts = 0;
    103 static int expected;
    104 static void report_result(libtrace_t *trace UNUSED, int mesg,
    105                           libtrace_generic_t data,
    106                           libtrace_thread_t *sender UNUSED) {
    107         static int totalthreads = 0;
    108         switch (mesg) {
    109         case MESSAGE_RESULT:
    110                 assert(data.res->key == 0);
    111                 printf("%d,", data.res->value.sint);
    112                 totalthreads++;
    113                 totalpkts += data.res->value.sint;
    114                 break;
    115         case MESSAGE_STARTING:
    116                 // Should have a single thread here
    117                 assert(libtrace_get_perpkt_count(trace) == 1);
    118                 printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
    119                 break;
    120         case MESSAGE_STOPPING:
    121                 printf(")\n");
    122                 assert(totalthreads == libtrace_get_perpkt_count(trace));
    123                 break;
     101struct final {
     102        int threads;
     103        int packets;
     104};
     105
     106static void *report_start(libtrace_t *trace UNUSED,
     107                libtrace_thread_t *t UNUSED,
     108                void *global) {
     109        uint32_t *magic = (uint32_t *)global;
     110        struct final *threadcounter =
     111                        (struct final *)malloc(sizeof(struct final));
     112
     113        assert(*magic == 0xabcdef);
     114
     115        threadcounter->threads = 0;
     116        threadcounter->packets = 0;
     117        return threadcounter;
     118}
     119
     120static void report_cb(libtrace_t *trace UNUSED,
     121                libtrace_thread_t *sender UNUSED,
     122                void *global, void *tls, libtrace_result_t *res) {
     123
     124        uint32_t *magic = (uint32_t *)global;
     125        struct final *threadcounter = (struct final *)tls;
     126
     127        assert(*magic == 0xabcdef);
     128        assert(res->key == 0);
     129
     130        threadcounter->threads ++;
     131        threadcounter->packets += res->value.sint;
     132
     133        assert(res->value.sint == 100);
     134        printf("%d\n", res->value.sint);
     135}
     136
     137static void report_end(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     138                void *global, void *tls) {
     139
     140        uint32_t *magic = (uint32_t *)global;
     141        struct final *threadcounter = (struct final *)tls;
     142
     143        assert(*magic == 0xabcdef);
     144        assert(threadcounter->threads == trace_get_perpkt_threads(trace));
     145        assert(threadcounter->packets == 100);
     146
     147        free(threadcounter);
     148}
     149
     150static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED,
     151                libtrace_thread_t *t UNUSED,
     152                void *global, void *tls, libtrace_packet_t *packet) {
     153        struct TLS *storage = (struct TLS *)tls;
     154        uint32_t *magic = (uint32_t *)global;
     155        static __thread int count = 0;
     156        int a,*b,c=0;
     157
     158        assert(storage != NULL);
     159        assert(!storage->seen_stop_message);
     160
     161        if (storage->seen_pausing_message)
     162                assert(storage->seen_resuming_message);
     163
     164        assert(*magic == 0xabcdef);
     165
     166        storage->count ++;
     167        count ++;
     168
     169        assert(count == storage->count);
     170
     171        if (count > 100) {
     172                fprintf(stderr, "Too many packets -- someone should stop me!\n");
     173                kill(getpid(), SIGTERM);
     174        }
     175
     176        // Do some work to even out the load on cores
     177        b = &c;
     178        for (a = 0; a < 10000000; a++) {
     179                c += a**b;
     180        }
     181
     182        return packet;
     183}
     184
     185static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     186                void *global) {
     187
     188        static __thread bool seen_start_message = false;
     189        uint32_t *magic = (uint32_t *)global;
     190        struct TLS *storage = NULL;
     191        assert(*magic == 0xabcdef);
     192
     193        assert(!seen_start_message);
     194        assert(trace);
     195
     196        storage = (struct TLS *)malloc(sizeof(struct TLS));
     197        storage->seen_start_message = true;
     198        storage->seen_stop_message = false;
     199        storage->seen_resuming_message = false;
     200        storage->seen_pausing_message = false;
     201        storage->count = 0;
     202
     203        seen_start_message = true;
     204
     205        return storage;
     206}
     207
     208static void stop_processing(libtrace_t *trace, libtrace_thread_t *t,
     209                void *global, void *tls) {
     210
     211        static __thread bool seen_stop_message = false;
     212        struct TLS *storage = (struct TLS *)tls;
     213        uint32_t *magic = (uint32_t *)global;
     214
     215        assert(storage != NULL);
     216        assert(!storage->seen_stop_message);
     217        assert(!seen_stop_message);
     218        assert(storage->seen_start_message);
     219        assert(*magic == 0xabcdef);
     220
     221        seen_stop_message = true;
     222        storage->seen_stop_message = true;
     223
     224        assert(storage->count == 100);
     225
     226        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = storage->count}, RESULT_USER);
     227        trace_post_reporter(trace);
     228        free(storage);
     229}
     230
     231static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     232                void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) {
     233
     234        fprintf(stderr, "Not expecting a tick packet\n");
     235        kill(getpid(), SIGTERM);
     236}
     237
     238static void pause_processing(libtrace_t *trace UNUSED,
     239                libtrace_thread_t *t UNUSED,
     240                void *global, void *tls) {
     241
     242        static __thread bool seen_pause_message = false;
     243        struct TLS *storage = (struct TLS *)tls;
     244        uint32_t *magic = (uint32_t *)global;
     245
     246        assert(storage != NULL);
     247        assert(!storage->seen_stop_message);
     248        assert(storage->seen_start_message);
     249        assert(*magic == 0xabcdef);
     250
     251        assert(seen_pause_message == storage->seen_pausing_message);
     252
     253        seen_pause_message = true;
     254        storage->seen_pausing_message = true;
     255}
     256
     257static void resume_processing(libtrace_t *trace UNUSED,
     258                libtrace_thread_t *t UNUSED,
     259                void *global, void *tls) {
     260
     261        static __thread bool seen_resume_message = false;
     262        struct TLS *storage = (struct TLS *)tls;
     263        uint32_t *magic = (uint32_t *)global;
     264
     265        assert(storage != NULL);
     266        assert(!storage->seen_stop_message);
     267        assert(storage->seen_start_message);
     268        assert(*magic == 0xabcdef);
     269
     270        assert(seen_resume_message == storage->seen_resuming_message);
     271
     272        seen_resume_message = true;
     273        storage->seen_resuming_message = true;
     274}
     275
     276uint64_t custom_hash(const libtrace_packet_t *packet UNUSED, void *data) {
     277        int *count = (int *)data;
     278        *count += 1;
     279
     280        /* Just throw the first 25 packets to thread 0 and the rest to thread
     281         * 1.
     282         */
     283        if (*count <= 25)
     284                return 0;
     285        return 1;
     286}
     287
     288int main(int argc, char *argv[]) {
     289        int error = 0;
     290        const char *tracename;
     291        libtrace_t *trace;
     292        libtrace_callback_set_t *processing = NULL;
     293        libtrace_callback_set_t *reporter = NULL;
     294        uint32_t global = 0xabcdef;
     295        int hashercount = 0;
     296
     297        if (argc<2) {
     298                fprintf(stderr,"usage: %s type\n",argv[0]);
     299                return 1;
    124300        }
    125 }
    126 
    127 static int x;
    128 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    129                         int mesg, libtrace_generic_t data,
    130                         libtrace_thread_t *sender UNUSED) {
    131         struct TLS *tls;
    132         void* ret;
    133         int a,*b,c=0;
    134         tls = trace_get_tls(t);
    135 
    136         switch (mesg) {
    137         case MESSAGE_PACKET:
    138                 assert(tls != NULL);
    139                 assert(!(tls->seen_stop_message));
    140                 tls->count++;
    141                 if (tls->count>100) {
    142                         fprintf(stderr, "Too many packets someone should stop me!!\n");
    143                         kill(getpid(), SIGTERM);
    144                 }
    145                 // Do some work to even out the load on cores
    146                 b = &c;
    147                 for (a = 0; a < 10000000; a++) {
    148                         c += a**b;
    149                 }
    150                 x = c;
    151                 return data.pkt;
    152         case MESSAGE_STARTING:
    153                 assert(tls == NULL);
    154                 tls = calloc(sizeof(struct TLS), 1);
    155                 ret = trace_set_tls(t, tls);
    156                 assert(ret == NULL);
    157                 tls->seen_start_message = true;
    158                 break;
    159         case MESSAGE_STOPPING:
    160                 assert(tls->seen_start_message);
    161                 assert(tls != NULL);
    162                 tls->seen_stop_message = true;
    163                 trace_set_tls(t, NULL);
    164 
    165                 // All threads publish to verify the thread count
    166                 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_USER);
    167                 trace_post_reporter(trace);
    168                 free(tls);
    169                 break;
    170         case MESSAGE_TICK_INTERVAL:
    171         case MESSAGE_TICK_COUNT:
    172                 assert(tls->seen_start_message );
    173                 fprintf(stderr, "Not expecting a tick packet\n");
    174                 kill(getpid(), SIGTERM);
    175                 break;
    176         case MESSAGE_PAUSING:
    177                 assert(tls->seen_start_message);
    178                 tls->seen_pausing_message = true;
    179                 break;
    180         case MESSAGE_RESUMING:
    181                 assert(tls->seen_pausing_message || tls->seen_start_message);
    182                 tls->seen_resuming_message = true;
    183                 break;
    184         }
    185         return NULL;
    186 }
    187 
    188 /**
    189  * Sends the first 25 packets to thread 0, the next 75 to thread 1
    190  * This is based on a few internal workings assumptions, which
    191  * might change and still be valid even if this test fails!!.
    192  */
    193 uint64_t hash25_75(const libtrace_packet_t* packet UNUSED, void *data) {
    194         int *count = (int *) data;
    195         *count += 1;
    196         if (*count <= 25)
    197                 return 0;
    198         return 1;
    199 }
    200 
    201 /**
    202  * Test that the hasher function works in single threaded mode
    203  * It might not be called but this ensures consistency
    204  */
    205 int test_hasher_singlethreaded(const char *tracename) {
    206         libtrace_t *trace;
    207         int error = 0;
    208         int hashercount = 0;
    209         printf("Testing hasher singlethreaded function\n");
    210 
    211         // Create the trace
     301
     302        tracename = lookup_uri(argv[1]);
     303
    212304        trace = trace_create(tracename);
    213305        iferr(trace,tracename);
    214306
    215         // Use 1 thread with a hasher
    216         trace_set_perpkt_threads(trace, 1);
    217         trace_set_hasher(trace, HASHER_CUSTOM, &hash25_75, &hashercount);
    218 
    219         // Start it
    220         trace_pstart(trace, NULL, per_packet, report_result);
     307        processing = trace_create_callback_set();
     308        trace_set_starting_cb(processing, start_processing);
     309        trace_set_stopping_cb(processing, stop_processing);
     310        trace_set_packet_cb(processing, per_packet);
     311        trace_set_pausing_cb(processing, pause_processing);
     312        trace_set_resuming_cb(processing, resume_processing);
     313        trace_set_tick_count_cb(processing, process_tick);
     314        trace_set_tick_interval_cb(processing, process_tick);
     315
     316        reporter = trace_create_callback_set();
     317        trace_set_starting_cb(reporter, report_start);
     318        trace_set_stopping_cb(reporter, report_end);
     319        trace_set_result_cb(reporter, report_cb);
     320
     321
     322        /* Set up our hasher and our single thread */
     323        trace_set_perpkt_threads(trace, 1);
     324        trace_set_hasher(trace, HASHER_CUSTOM, &custom_hash, &hashercount);
     325
     326        trace_pstart(trace, &global, processing, reporter);
    221327        iferr(trace,tracename);
    222328
    223         /* Make sure traces survive a pause and restart */
     329        /* Make sure traces survive a pause */
    224330        trace_ppause(trace);
    225331        iferr(trace,tracename);
     
    230336        trace_join(trace);
    231337
     338        global = 0xffffffff;
     339
    232340        /* Now check we have all received all the packets */
    233         if (error == 0) {
    234                 if (totalpkts == expected) {
    235                         printf("success: %d packets read\n",expected);
    236                 } else {
    237                         printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
    238                         error = 1;
    239                 }
    240         } else {
     341        if (error != 0) {
    241342                iferr(trace,tracename);
    242343        }
    243     trace_destroy(trace);
    244     return error;
    245 }
    246 
    247 
    248 int main(int argc, char *argv[]) {
    249         int error = 0;
    250         const char *tracename;
    251         expected = 100;
    252 
    253         if (argc<2) {
    254                 fprintf(stderr,"usage: %s type\n",argv[0]);
    255                 return 1;
    256         }
    257 
    258         tracename = lookup_uri(argv[1]);
    259 
    260         if (strcmp(argv[1],"rtclient")==0) expected=101;
    261 
    262         error = test_hasher_singlethreaded(tracename);
    263     return error;
    264 }
     344
     345        trace_destroy(trace);
     346        trace_destroy_callback_set(processing);
     347        trace_destroy_callback_set(reporter);
     348        return error;
     349}
  • test/test-format-parallel-singlethreaded.c

    r6b98325 r8decff7  
    9191}
    9292
    93 
    9493struct TLS {
    9594        bool seen_start_message;
     
    10099};
    101100
    102 static int totalpkts = 0;
    103 static void report_result(libtrace_t *trace UNUSED, int mesg,
    104                           libtrace_generic_t data,
    105                           libtrace_thread_t *sender UNUSED) {
    106         static int totalthreads = 0;
    107         switch (mesg) {
    108         case MESSAGE_RESULT:
    109                 assert(data.res->key == 0);
    110                 printf("%d,", data.res->value.sint);
    111                 totalthreads++;
    112                 totalpkts += data.res->value.sint;
    113                 break;
    114         case MESSAGE_STARTING:
    115                 // Should have a single thread here
    116                 assert(libtrace_get_perpkt_count(trace) == 1);
    117                 printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
    118                 break;
    119         case MESSAGE_STOPPING:
    120                 printf(")\n");
    121                 assert(totalthreads == libtrace_get_perpkt_count(trace));
    122                 break;
     101struct final {
     102        int threads;
     103        int packets;
     104};
     105
     106static void *report_start(libtrace_t *trace UNUSED,
     107                libtrace_thread_t *t UNUSED,
     108                void *global) {
     109        uint32_t *magic = (uint32_t *)global;
     110        struct final *threadcounter =
     111                        (struct final *)malloc(sizeof(struct final));
     112
     113        assert(*magic == 0xabcdef);
     114
     115        threadcounter->threads = 0;
     116        threadcounter->packets = 0;
     117        return threadcounter;
     118}
     119
     120static void report_cb(libtrace_t *trace UNUSED,
     121                libtrace_thread_t *sender UNUSED,
     122                void *global, void *tls, libtrace_result_t *res) {
     123
     124        uint32_t *magic = (uint32_t *)global;
     125        struct final *threadcounter = (struct final *)tls;
     126
     127        assert(*magic == 0xabcdef);
     128        assert(res->key == 0);
     129
     130        threadcounter->threads ++;
     131        threadcounter->packets += res->value.sint;
     132        printf("%d\n", res->value.sint);
     133}
     134
     135static void report_end(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     136                void *global, void *tls) {
     137
     138        uint32_t *magic = (uint32_t *)global;
     139        struct final *threadcounter = (struct final *)tls;
     140
     141        assert(*magic == 0xabcdef);
     142        assert(threadcounter->threads == trace_get_perpkt_threads(trace));
     143        assert(threadcounter->packets == 100);
     144
     145        free(threadcounter);
     146}
     147
     148static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED,
     149                libtrace_thread_t *t UNUSED,
     150                void *global, void *tls, libtrace_packet_t *packet) {
     151        struct TLS *storage = (struct TLS *)tls;
     152        uint32_t *magic = (uint32_t *)global;
     153        static __thread int count = 0;
     154        int a,*b,c=0;
     155
     156        assert(storage != NULL);
     157        assert(!storage->seen_stop_message);
     158
     159        if (storage->seen_pausing_message)
     160                assert(storage->seen_resuming_message);
     161
     162        assert(*magic == 0xabcdef);
     163
     164        storage->count ++;
     165        count ++;
     166
     167        assert(count == storage->count);
     168
     169        if (count > 100) {
     170                fprintf(stderr, "Too many packets -- someone should stop me!\n");
     171                kill(getpid(), SIGTERM);
     172        }
     173
     174        // Do some work to even out the load on cores
     175        b = &c;
     176        for (a = 0; a < 10000000; a++) {
     177                c += a**b;
     178        }
     179
     180        return packet;
     181}
     182
     183static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     184                void *global) {
     185
     186        static __thread bool seen_start_message = false;
     187        uint32_t *magic = (uint32_t *)global;
     188        struct TLS *storage = NULL;
     189        assert(*magic == 0xabcdef);
     190
     191        assert(!seen_start_message);
     192        assert(trace);
     193
     194        storage = (struct TLS *)malloc(sizeof(struct TLS));
     195        storage->seen_start_message = true;
     196        storage->seen_stop_message = false;
     197        storage->seen_resuming_message = false;
     198        storage->seen_pausing_message = false;
     199        storage->count = 0;
     200
     201        seen_start_message = true;
     202
     203        return storage;
     204}
     205
     206static void stop_processing(libtrace_t *trace, libtrace_thread_t *t,
     207                void *global, void *tls) {
     208
     209        static __thread bool seen_stop_message = false;
     210        struct TLS *storage = (struct TLS *)tls;
     211        uint32_t *magic = (uint32_t *)global;
     212
     213        assert(storage != NULL);
     214        assert(!storage->seen_stop_message);
     215        assert(!seen_stop_message);
     216        assert(storage->seen_start_message);
     217        assert(*magic == 0xabcdef);
     218
     219        seen_stop_message = true;
     220        storage->seen_stop_message = true;
     221
     222        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = storage->count}, RESULT_USER);
     223        trace_post_reporter(trace);
     224        free(storage);
     225}
     226
     227static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     228                void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) {
     229
     230        fprintf(stderr, "Not expecting a tick packet\n");
     231        kill(getpid(), SIGTERM);
     232}
     233
     234static void pause_processing(libtrace_t *trace UNUSED,
     235                libtrace_thread_t *t UNUSED,
     236                void *global, void *tls) {
     237
     238        static __thread bool seen_pause_message = false;
     239        struct TLS *storage = (struct TLS *)tls;
     240        uint32_t *magic = (uint32_t *)global;
     241
     242        assert(storage != NULL);
     243        assert(!storage->seen_stop_message);
     244        assert(storage->seen_start_message);
     245        assert(*magic == 0xabcdef);
     246
     247        assert(seen_pause_message == storage->seen_pausing_message);
     248
     249        seen_pause_message = true;
     250        storage->seen_pausing_message = true;
     251}
     252
     253static void resume_processing(libtrace_t *trace UNUSED,
     254                libtrace_thread_t *t UNUSED,
     255                void *global, void *tls) {
     256
     257        static __thread bool seen_resume_message = false;
     258        struct TLS *storage = (struct TLS *)tls;
     259        uint32_t *magic = (uint32_t *)global;
     260
     261        assert(storage != NULL);
     262        assert(!storage->seen_stop_message);
     263        assert(storage->seen_start_message);
     264        assert(*magic == 0xabcdef);
     265
     266        assert(seen_resume_message == storage->seen_resuming_message);
     267
     268        seen_resume_message = true;
     269        storage->seen_resuming_message = true;
     270}
     271
     272int main(int argc, char *argv[]) {
     273        int error = 0;
     274        const char *tracename;
     275        libtrace_t *trace;
     276        libtrace_callback_set_t *processing = NULL;
     277        libtrace_callback_set_t *reporter = NULL;
     278        uint32_t global = 0xabcdef;
     279
     280        if (argc<2) {
     281                fprintf(stderr,"usage: %s type\n",argv[0]);
     282                return 1;
    123283        }
    124 }
    125 
    126 static int x;
    127 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    128                         int mesg, libtrace_generic_t data,
    129                         libtrace_thread_t *sender UNUSED) {
    130         struct TLS *tls;
    131         void* ret;
    132         int a,*b,c=0;
    133         tls = trace_get_tls(t);
    134 
    135         switch (mesg) {
    136         case MESSAGE_PACKET:
    137                 assert(tls != NULL);
    138                 assert(!(tls->seen_stop_message));
    139                 tls->count++;
    140                 if (tls->count>100) {
    141                         fprintf(stderr, "Too many packets someone should stop me!!\n");
    142                         kill(getpid(), SIGTERM);
    143                 }
    144                 // Do some work to even out the load on cores
    145                 b = &c;
    146                 for (a = 0; a < 10000000; a++) {
    147                         c += a**b;
    148                 }
    149                 x = c;
    150                 return data.pkt;
    151         case MESSAGE_STARTING:
    152                 assert(tls == NULL);
    153                 tls = calloc(sizeof(struct TLS), 1);
    154                 ret = trace_set_tls(t, tls);
    155                 assert(ret == NULL);
    156                 tls->seen_start_message = true;
    157                 break;
    158         case MESSAGE_STOPPING:
    159                 assert(tls->seen_start_message);
    160                 assert(tls != NULL);
    161                 tls->seen_stop_message = true;
    162                 trace_set_tls(t, NULL);
    163 
    164                 // All threads publish to verify the thread count
    165                 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_USER);
    166                 trace_post_reporter(trace);
    167                 free(tls);
    168                 break;
    169         case MESSAGE_TICK_INTERVAL:
    170         case MESSAGE_TICK_COUNT:
    171                 assert(tls->seen_start_message );
    172                 fprintf(stderr, "Not expecting a tick packet\n");
    173                 kill(getpid(), SIGTERM);
    174                 break;
    175         case MESSAGE_PAUSING:
    176                 assert(tls->seen_start_message);
    177                 tls->seen_pausing_message = true;
    178                 break;
    179         case MESSAGE_RESUMING:
    180                 assert(tls->seen_pausing_message || tls->seen_start_message);
    181                 tls->seen_resuming_message = true;
    182                 break;
    183         }
    184         return NULL;
    185 }
    186 
    187 
    188 /**
    189  * Test that the single threaded fallback works
    190  */
    191 int test_single_threaded(const char *tracename, int expected) {
    192         libtrace_t *trace;
    193         int error = 0;
    194         printf("Testing single threaded\n");
    195 
    196         // Create the trace
     284
     285        tracename = lookup_uri(argv[1]);
     286
    197287        trace = trace_create(tracename);
    198288        iferr(trace,tracename);
    199289
    200         // Enable the single threaded fallback codepath
    201         trace_set_perpkt_threads(trace, 1);
    202 
    203         // Start it
    204         trace_pstart(trace, NULL, per_packet, report_result);
     290        processing = trace_create_callback_set();
     291        trace_set_starting_cb(processing, start_processing);
     292        trace_set_stopping_cb(processing, stop_processing);
     293        trace_set_packet_cb(processing, per_packet);
     294        trace_set_pausing_cb(processing, pause_processing);
     295        trace_set_resuming_cb(processing, resume_processing);
     296        trace_set_tick_count_cb(processing, process_tick);
     297        trace_set_tick_interval_cb(processing, process_tick);
     298
     299        reporter = trace_create_callback_set();
     300        trace_set_starting_cb(reporter, report_start);
     301        trace_set_stopping_cb(reporter, report_end);
     302        trace_set_result_cb(reporter, report_cb);
     303
     304        /* Limit this to just one thread */
     305        trace_set_perpkt_threads(trace, 1);
     306
     307        trace_pstart(trace, &global, processing, reporter);
    205308        iferr(trace,tracename);
    206309
    207         /* Make sure traces survive a pause and restart */
     310        /* Make sure traces survive a pause */
    208311        trace_ppause(trace);
    209312        iferr(trace,tracename);
     
    214317        trace_join(trace);
    215318
     319        global = 0xffffffff;
     320
    216321        /* Now check we have all received all the packets */
    217         if (error == 0) {
    218                 if (totalpkts == expected) {
    219                         printf("success: %d packets read\n",expected);
    220                 } else {
    221                         printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
    222                         error = 1;
    223                 }
    224         } else {
     322        if (error != 0) {
    225323                iferr(trace,tracename);
    226324        }
    227     trace_destroy(trace);
    228     return error;
    229 }
    230 
    231 int main(int argc, char *argv[]) {
    232         int error = 0;
    233         int expected = 100;
    234         const char *tracename;
    235 
    236         if (argc<2) {
    237                 fprintf(stderr,"usage: %s type\n",argv[0]);
    238                 return 1;
    239         }
    240 
    241         tracename = lookup_uri(argv[1]);
    242 
    243         if (strcmp(argv[1],"rtclient")==0) expected=101;
    244 
    245         error = test_single_threaded(tracename, expected);
    246     return error;
    247 }
     325
     326        trace_destroy(trace);
     327        trace_destroy_callback_set(processing);
     328        trace_destroy_callback_set(reporter);
     329        return error;
     330}
  • test/test-format-parallel-stressthreads.c

    r6b98325 r8decff7  
    9191}
    9292
    93 
    9493struct TLS {
    9594        bool seen_start_message;
     
    10099};
    101100
    102 static int totalpkts = 0;
    103 static void report_result(libtrace_t *trace UNUSED, int mesg,
    104                           libtrace_generic_t data,
    105                           libtrace_thread_t *sender UNUSED) {
    106         static int totalthreads = 0;
    107         switch (mesg) {
    108         case MESSAGE_RESULT:
    109                 assert(data.res->key == 0);
    110                 printf("%d,", data.res->value.sint);
    111                 totalthreads++;
    112                 totalpkts += data.res->value.sint;
    113                 break;
    114         case MESSAGE_STARTING:
    115                 // Should have a single thread here
    116                 assert(libtrace_get_perpkt_count(trace) == 100);
    117                 printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
    118                 break;
    119         case MESSAGE_STOPPING:
    120                 printf(")\n");
    121                 assert(totalthreads == libtrace_get_perpkt_count(trace));
    122                 break;
     101struct final {
     102        int threads;
     103        int packets;
     104};
     105
     106static void *report_start(libtrace_t *trace UNUSED,
     107                libtrace_thread_t *t UNUSED,
     108                void *global) {
     109        uint32_t *magic = (uint32_t *)global;
     110        struct final *threadcounter =
     111                        (struct final *)malloc(sizeof(struct final));
     112
     113        assert(*magic == 0xabcdef);
     114
     115        threadcounter->threads = 0;
     116        threadcounter->packets = 0;
     117        return threadcounter;
     118}
     119
     120static void report_cb(libtrace_t *trace UNUSED,
     121                libtrace_thread_t *sender UNUSED,
     122                void *global, void *tls, libtrace_result_t *res) {
     123
     124        uint32_t *magic = (uint32_t *)global;
     125        struct final *threadcounter = (struct final *)tls;
     126
     127        assert(*magic == 0xabcdef);
     128        assert(res->key == 0);
     129
     130        threadcounter->threads ++;
     131        threadcounter->packets += res->value.sint;
     132        printf("%d\n", res->value.sint);
     133}
     134
     135static void report_end(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     136                void *global, void *tls) {
     137
     138        uint32_t *magic = (uint32_t *)global;
     139        struct final *threadcounter = (struct final *)tls;
     140
     141        assert(*magic == 0xabcdef);
     142        assert(threadcounter->threads == trace_get_perpkt_threads(trace));
     143        assert(threadcounter->packets == 100);
     144
     145        free(threadcounter);
     146}
     147
     148static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED,
     149                libtrace_thread_t *t UNUSED,
     150                void *global, void *tls, libtrace_packet_t *packet) {
     151        struct TLS *storage = (struct TLS *)tls;
     152        uint32_t *magic = (uint32_t *)global;
     153        static __thread int count = 0;
     154        int a,*b,c=0;
     155
     156        assert(storage != NULL);
     157        assert(!storage->seen_stop_message);
     158
     159        if (storage->seen_pausing_message)
     160                assert(storage->seen_resuming_message);
     161
     162        assert(*magic == 0xabcdef);
     163
     164        storage->count ++;
     165        count ++;
     166
     167        assert(count == storage->count);
     168
     169        if (count > 100) {
     170                fprintf(stderr, "Too many packets -- someone should stop me!\n");
     171                kill(getpid(), SIGTERM);
     172        }
     173
     174        // Do some work to even out the load on cores
     175        b = &c;
     176        for (a = 0; a < 10000000; a++) {
     177                c += a**b;
     178        }
     179
     180        return packet;
     181}
     182
     183static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     184                void *global) {
     185
     186        static __thread bool seen_start_message = false;
     187        uint32_t *magic = (uint32_t *)global;
     188        struct TLS *storage = NULL;
     189        assert(*magic == 0xabcdef);
     190
     191        assert(!seen_start_message);
     192        assert(trace);
     193
     194        storage = (struct TLS *)malloc(sizeof(struct TLS));
     195        storage->seen_start_message = true;
     196        storage->seen_stop_message = false;
     197        storage->seen_resuming_message = false;
     198        storage->seen_pausing_message = false;
     199        storage->count = 0;
     200
     201        seen_start_message = true;
     202
     203        return storage;
     204}
     205
     206static void stop_processing(libtrace_t *trace, libtrace_thread_t *t,
     207                void *global, void *tls) {
     208
     209        static __thread bool seen_stop_message = false;
     210        struct TLS *storage = (struct TLS *)tls;
     211        uint32_t *magic = (uint32_t *)global;
     212
     213        assert(storage != NULL);
     214        assert(!storage->seen_stop_message);
     215        assert(!seen_stop_message);
     216        assert(storage->seen_start_message);
     217        assert(*magic == 0xabcdef);
     218
     219        seen_stop_message = true;
     220        storage->seen_stop_message = true;
     221
     222        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = storage->count}, RESULT_USER);
     223        trace_post_reporter(trace);
     224        free(storage);
     225}
     226
     227static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     228                void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) {
     229
     230        fprintf(stderr, "Not expecting a tick packet\n");
     231        kill(getpid(), SIGTERM);
     232}
     233
     234static void pause_processing(libtrace_t *trace UNUSED,
     235                libtrace_thread_t *t UNUSED,
     236                void *global, void *tls) {
     237
     238        static __thread bool seen_pause_message = false;
     239        struct TLS *storage = (struct TLS *)tls;
     240        uint32_t *magic = (uint32_t *)global;
     241
     242        assert(storage != NULL);
     243        assert(!storage->seen_stop_message);
     244        assert(storage->seen_start_message);
     245        assert(*magic == 0xabcdef);
     246
     247        assert(seen_pause_message == storage->seen_pausing_message);
     248
     249        seen_pause_message = true;
     250        storage->seen_pausing_message = true;
     251}
     252
     253static void resume_processing(libtrace_t *trace UNUSED,
     254                libtrace_thread_t *t UNUSED,
     255                void *global, void *tls) {
     256
     257        static __thread bool seen_resume_message = false;
     258        struct TLS *storage = (struct TLS *)tls;
     259        uint32_t *magic = (uint32_t *)global;
     260
     261        assert(storage != NULL);
     262        assert(!storage->seen_stop_message);
     263        assert(storage->seen_start_message);
     264        assert(*magic == 0xabcdef);
     265
     266        assert(seen_resume_message == storage->seen_resuming_message);
     267
     268        seen_resume_message = true;
     269        storage->seen_resuming_message = true;
     270}
     271
     272int main(int argc, char *argv[]) {
     273        int error = 0;
     274        const char *tracename;
     275        libtrace_t *trace;
     276        libtrace_callback_set_t *processing = NULL;
     277        libtrace_callback_set_t *reporter = NULL;
     278        uint32_t global = 0xabcdef;
     279
     280        if (argc<2) {
     281                fprintf(stderr,"usage: %s type\n",argv[0]);
     282                return 1;
    123283        }
    124 }
    125 
    126 static int x;
    127 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    128                         int mesg, libtrace_generic_t data,
    129                         libtrace_thread_t *sender UNUSED) {
    130         struct TLS *tls;
    131         void* ret;
    132         int a,*b,c=0;
    133         tls = trace_get_tls(t);
    134 
    135         switch (mesg) {
    136         case MESSAGE_PACKET:
    137                 assert(tls != NULL);
    138                 assert(!(tls->seen_stop_message));
    139                 tls->count++;
    140                 if (tls->count>100) {
    141                         fprintf(stderr, "Too many packets someone should stop me!!\n");
    142                         kill(getpid(), SIGTERM);
    143                 }
    144                 // Do some work to even out the load on cores
    145                 b = &c;
    146                 for (a = 0; a < 10000000; a++) {
    147                         c += a**b;
    148                 }
    149                 x = c;
    150                 return data.pkt;
    151         case MESSAGE_STARTING:
    152                 assert(tls == NULL);
    153                 tls = calloc(sizeof(struct TLS), 1);
    154                 ret = trace_set_tls(t, tls);
    155                 assert(ret == NULL);
    156                 tls->seen_start_message = true;
    157                 break;
    158         case MESSAGE_STOPPING:
    159                 assert(tls->seen_start_message);
    160                 assert(tls != NULL);
    161                 tls->seen_stop_message = true;
    162                 trace_set_tls(t, NULL);
    163 
    164                 // All threads publish to verify the thread count
    165                 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_USER);
    166                 trace_post_reporter(trace);
    167                 free(tls);
    168                 break;
    169         case MESSAGE_TICK_INTERVAL:
    170         case MESSAGE_TICK_COUNT:
    171                 assert(tls->seen_start_message );
    172                 fprintf(stderr, "Not expecting a tick packet\n");
    173                 kill(getpid(), SIGTERM);
    174                 break;
    175         case MESSAGE_PAUSING:
    176                 assert(tls->seen_start_message);
    177                 tls->seen_pausing_message = true;
    178                 break;
    179         case MESSAGE_RESUMING:
    180                 assert(tls->seen_pausing_message || tls->seen_start_message);
    181                 tls->seen_resuming_message = true;
    182                 break;
    183         }
    184         return NULL;
    185 }
    186 
    187 
    188 /**
    189  * Test that the single threaded fallback works
    190  */
    191 int test_100_threads(const char *tracename, int expected) {
    192         libtrace_t *trace;
    193         int error = 0;
    194         printf("Testing single threaded\n");
    195 
    196         // Create the trace
     284
     285        tracename = lookup_uri(argv[1]);
     286
    197287        trace = trace_create(tracename);
    198288        iferr(trace,tracename);
    199289
    200         // Enable the single threaded fallback codepath
    201         trace_set_perpkt_threads(trace, 100);
    202 
    203         // Start it
    204         trace_pstart(trace, NULL, per_packet, report_result);
     290        processing = trace_create_callback_set();
     291        trace_set_starting_cb(processing, start_processing);
     292        trace_set_stopping_cb(processing, stop_processing);
     293        trace_set_packet_cb(processing, per_packet);
     294        trace_set_pausing_cb(processing, pause_processing);
     295        trace_set_resuming_cb(processing, resume_processing);
     296        trace_set_tick_count_cb(processing, process_tick);
     297        trace_set_tick_interval_cb(processing, process_tick);
     298
     299        reporter = trace_create_callback_set();
     300        trace_set_starting_cb(reporter, report_start);
     301        trace_set_stopping_cb(reporter, report_end);
     302        trace_set_result_cb(reporter, report_cb);
     303
     304
     305        trace_set_perpkt_threads(trace, 100);
     306
     307        trace_pstart(trace, &global, processing, reporter);
    205308        iferr(trace,tracename);
    206309
    207         /* Make sure traces survive a pause and restart */
     310        /* Make sure traces survive a pause */
    208311        trace_ppause(trace);
    209312        iferr(trace,tracename);
     
    214317        trace_join(trace);
    215318
     319        global = 0xffffffff;
     320
    216321        /* Now check we have all received all the packets */
    217         if (error == 0) {
    218                 if (totalpkts == expected) {
    219                         printf("success: %d packets read\n",expected);
    220                 } else {
    221                         printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
    222                         error = 1;
    223                 }
    224         } else {
     322        if (error != 0) {
    225323                iferr(trace,tracename);
    226324        }
    227     trace_destroy(trace);
    228     return error;
    229 }
    230 
    231 int main(int argc, char *argv[]) {
    232         int error = 0;
    233         int expected = 100;
    234         const char *tracename;
    235 
    236         if (argc<2) {
    237                 fprintf(stderr,"usage: %s type\n",argv[0]);
    238                 return 1;
    239         }
    240 
    241         tracename = lookup_uri(argv[1]);
    242 
    243         if (strcmp(argv[1],"rtclient")==0) expected=101;
    244 
    245         error = test_100_threads(tracename, expected);
    246     return error;
    247 }
     325
     326        trace_destroy(trace);
     327        trace_destroy_callback_set(processing);
     328        trace_destroy_callback_set(reporter);
     329        return error;
     330}
  • test/test-format-parallel.c

    r6a6e6a8 r8decff7  
    9999};
    100100
    101 static int totalpkts = 0;
    102 static void report_result(libtrace_t *trace UNUSED, int mesg,
    103                           libtrace_generic_t data,
    104                           libtrace_thread_t *sender UNUSED) {
    105         static int totalthreads = 0;
    106         switch (mesg) {
    107         case MESSAGE_RESULT:
    108                 assert(data.res->key == 0);
    109                 printf("%d,", data.res->value.sint);
    110                 totalthreads++;
    111                 totalpkts += data.res->value.sint;
    112                 break;
    113         case MESSAGE_STARTING:
    114                 printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace));
    115                 break;
    116         case MESSAGE_STOPPING:
    117                 printf(")\n");
    118                 assert(totalthreads == libtrace_get_perpkt_count(trace));
    119                 break;
    120         }
    121 }
    122 
    123 int x;
    124 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    125                         int mesg, libtrace_generic_t data,
    126                         libtrace_thread_t *sender UNUSED) {
    127         struct TLS *tls;
    128         void* ret;
     101struct final {
     102        int threads;
     103        int packets;
     104};
     105
     106static void *report_start(libtrace_t *trace UNUSED,
     107                libtrace_thread_t *t UNUSED,
     108                void *global) {
     109        uint32_t *magic = (uint32_t *)global;
     110        struct final *threadcounter =
     111                        (struct final *)malloc(sizeof(struct final));
     112
     113        assert(*magic == 0xabcdef);
     114
     115        threadcounter->threads = 0;
     116        threadcounter->packets = 0;
     117        return threadcounter;
     118}
     119
     120static void report_cb(libtrace_t *trace UNUSED,
     121                libtrace_thread_t *sender UNUSED,
     122                void *global, void *tls, libtrace_result_t *res) {
     123
     124        uint32_t *magic = (uint32_t *)global;
     125        struct final *threadcounter = (struct final *)tls;
     126
     127        assert(*magic == 0xabcdef);
     128        assert(res->key == 0);
     129
     130        threadcounter->threads ++;
     131        threadcounter->packets += res->value.sint;
     132        printf("%d\n", res->value.sint);
     133}
     134
     135static void report_end(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     136                void *global, void *tls) {
     137
     138        uint32_t *magic = (uint32_t *)global;
     139        struct final *threadcounter = (struct final *)tls;
     140
     141        assert(*magic == 0xabcdef);
     142        assert(threadcounter->threads == trace_get_perpkt_threads(trace));
     143        assert(threadcounter->packets == 100);
     144
     145        free(threadcounter);
     146}
     147
     148static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED,
     149                libtrace_thread_t *t UNUSED,
     150                void *global, void *tls, libtrace_packet_t *packet) {
     151        struct TLS *storage = (struct TLS *)tls;
     152        uint32_t *magic = (uint32_t *)global;
     153        static __thread int count = 0;
    129154        int a,*b,c=0;
    130         // Test internal TLS against __thread
    131         static __thread bool seen_start_message = false;
    132         static __thread bool seen_stop_message = false;
    133         static __thread bool seen_resuming_message = false;
    134         static __thread bool seen_pausing_message = false;
    135         static __thread int count = 0;
    136         tls = trace_get_tls(t);
    137 
    138         switch (mesg) {
    139         case MESSAGE_PACKET:
    140                 assert(tls != NULL);
    141                 assert(!seen_stop_message);
    142                 count++;
    143                 tls->count++;
    144                 if (count>100) {
    145                         fprintf(stderr, "Too many packets someone should stop me!!\n");
    146                         kill(getpid(), SIGTERM);
    147                 }
    148                 // Do some work to even out the load on cores
    149                 b = &c;
    150                 for (a = 0; a < 10000000; a++) {
    151                         c += a**b;
    152                 }
    153                 x = c;
    154                 return data.pkt;
    155         case MESSAGE_STARTING:
    156                 assert(!seen_start_message || seen_resuming_message);
    157                 assert(tls == NULL);
    158                 tls = calloc(sizeof(struct TLS), 1);
    159                 ret = trace_set_tls(t, tls);
    160                 assert(ret == NULL);
    161                 seen_start_message = true;
    162                 tls->seen_start_message = true;
    163                 break;
    164         case MESSAGE_STOPPING:
    165                 assert(seen_start_message);
    166                 assert(tls != NULL);
    167                 assert(tls->seen_start_message);
    168                 assert(tls->count == count);
    169                 seen_stop_message = true;
    170                 tls->seen_stop_message = true;
    171                 free(tls);
    172                 trace_set_tls(t, NULL);
    173 
    174                 // All threads publish to verify the thread count
    175                 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = count}, RESULT_USER);
    176                 trace_post_reporter(trace);
    177                 break;
    178         case MESSAGE_TICK_INTERVAL:
    179         case MESSAGE_TICK_COUNT:
    180                 assert(seen_start_message);
    181                 fprintf(stderr, "Not expecting a tick packet\n");
    182                 kill(getpid(), SIGTERM);
    183                 break;
    184         case MESSAGE_PAUSING:
    185                 assert(seen_start_message);
    186                 seen_pausing_message = true;
    187                 tls->seen_pausing_message = true;
    188                 break;
    189         case MESSAGE_RESUMING:
    190                 assert(tls->seen_pausing_message  || tls->seen_start_message );
    191                 seen_resuming_message = true;
    192                 tls->seen_resuming_message = true;
    193                 break;
    194         }
    195         return NULL;
     155
     156        assert(storage != NULL);
     157        assert(!storage->seen_stop_message);
     158
     159        if (storage->seen_pausing_message)
     160                assert(storage->seen_resuming_message);
     161
     162        assert(*magic == 0xabcdef);
     163
     164        storage->count ++;
     165        count ++;
     166
     167        assert(count == storage->count);
     168
     169        if (count > 100) {
     170                fprintf(stderr, "Too many packets -- someone should stop me!\n");
     171                kill(getpid(), SIGTERM);
     172        }
     173
     174        // Do some work to even out the load on cores
     175        b = &c;
     176        for (a = 0; a < 10000000; a++) {
     177                c += a**b;
     178        }
     179
     180        return packet;
     181}
     182
     183static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED,
     184                void *global) {
     185
     186        static __thread bool seen_start_message = false;
     187        uint32_t *magic = (uint32_t *)global;
     188        struct TLS *storage = NULL;
     189        assert(*magic == 0xabcdef);
     190
     191        assert(!seen_start_message);
     192        assert(trace);
     193
     194        storage = (struct TLS *)malloc(sizeof(struct TLS));
     195        storage->seen_start_message = true;
     196        storage->seen_stop_message = false;
     197        storage->seen_resuming_message = false;
     198        storage->seen_pausing_message = false;
     199        storage->count = 0;
     200
     201        seen_start_message = true;
     202
     203        return storage;
     204}
     205
     206static void stop_processing(libtrace_t *trace, libtrace_thread_t *t,
     207                void *global, void *tls) {
     208
     209        static __thread bool seen_stop_message = false;
     210        struct TLS *storage = (struct TLS *)tls;
     211        uint32_t *magic = (uint32_t *)global;
     212
     213        assert(storage != NULL);
     214        assert(!storage->seen_stop_message);
     215        assert(!seen_stop_message);
     216        assert(storage->seen_start_message);
     217        assert(*magic == 0xabcdef);
     218
     219        seen_stop_message = true;
     220        storage->seen_stop_message = true;
     221
     222        trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = storage->count}, RESULT_USER);
     223        trace_post_reporter(trace);
     224        free(storage);
     225}
     226
     227static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED,
     228                void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) {
     229
     230        fprintf(stderr, "Not expecting a tick packet\n");
     231        kill(getpid(), SIGTERM);
     232}
     233
     234static void pause_processing(libtrace_t *trace UNUSED,
     235                libtrace_thread_t *t UNUSED,
     236                void *global, void *tls) {
     237
     238        static __thread bool seen_pause_message = false;
     239        struct TLS *storage = (struct TLS *)tls;
     240        uint32_t *magic = (uint32_t *)global;
     241
     242        assert(storage != NULL);
     243        assert(!storage->seen_stop_message);
     244        assert(storage->seen_start_message);
     245        assert(*magic == 0xabcdef);
     246
     247        assert(seen_pause_message == storage->seen_pausing_message);
     248
     249        seen_pause_message = true;
     250        storage->seen_pausing_message = true;
     251}
     252
     253static void resume_processing(libtrace_t *trace UNUSED,
     254                libtrace_thread_t *t UNUSED,
     255                void *global, void *tls) {
     256
     257        static __thread bool seen_resume_message = false;
     258        struct TLS *storage = (struct TLS *)tls;
     259        uint32_t *magic = (uint32_t *)global;
     260
     261        assert(storage != NULL);
     262        assert(!storage->seen_stop_message);
     263        assert(storage->seen_start_message);
     264        assert(*magic == 0xabcdef);
     265
     266        assert(seen_resume_message == storage->seen_resuming_message);
     267
     268        seen_resume_message = true;
     269        storage->seen_resuming_message = true;
    196270}
    197271
    198272int main(int argc, char *argv[]) {
    199273        int error = 0;
    200         int expected = 100;
    201274        const char *tracename;
    202275        libtrace_t *trace;
     276        libtrace_callback_set_t *processing = NULL;
     277        libtrace_callback_set_t *reporter = NULL;
     278        uint32_t global = 0xabcdef;
    203279
    204280        if (argc<2) {
     
    212288        iferr(trace,tracename);
    213289
    214         if (strcmp(argv[1],"rtclient")==0) expected=101;
    215 
    216         trace_pstart(trace, NULL, per_packet, report_result);
     290        processing = trace_create_callback_set();
     291        trace_set_starting_cb(processing, start_processing);
     292        trace_set_stopping_cb(processing, stop_processing);
     293        trace_set_packet_cb(processing, per_packet);
     294        trace_set_pausing_cb(processing, pause_processing);
     295        trace_set_resuming_cb(processing, resume_processing);
     296        trace_set_tick_count_cb(processing, process_tick);
     297        trace_set_tick_interval_cb(processing, process_tick);
     298
     299        reporter = trace_create_callback_set();
     300        trace_set_starting_cb(reporter, report_start);
     301        trace_set_stopping_cb(reporter, report_end);
     302        trace_set_result_cb(reporter, report_cb);
     303
     304
     305        trace_set_perpkt_threads(trace, 4);
     306
     307        trace_pstart(trace, &global, processing, reporter);
    217308        iferr(trace,tracename);
    218309
     
    226317        trace_join(trace);
    227318
     319        global = 0xffffffff;
     320
    228321        /* Now check we have all received all the packets */
    229         if (error == 0) {
    230                 if (totalpkts == expected) {
    231                         printf("success: %d packets read\n",expected);
    232                 } else {
    233                         printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
    234                         error = 1;
    235                 }
    236         } else {
     322        if (error != 0) {
    237323                iferr(trace,tracename);
    238324        }
    239325
    240     trace_destroy(trace);
    241     return error;
    242 }
     326        trace_destroy(trace);
     327        trace_destroy_callback_set(processing);
     328        trace_destroy_callback_set(reporter);
     329        return error;
     330}
  • test/test-live.c

    r116f970 r8decff7  
    150150{
    151151        int err = 0;
     152        libtrace_stat_t *stat;
     153
     154        stat = trace_create_statistics();
     155
     156        trace_get_statistics(trace_read, stat);
    152157        // Assume no loss here, if not the case we would of hung in reading loop
    153158        // anyway
    154         if (trace_get_dropped_packets(trace_read) != 0) {
    155                 if (trace_get_dropped_packets(trace_read) == UINT64_MAX) {
    156                         printf("\tInfo: trace does not support drop counter\n");
    157                 } else {
    158                         ERROR("Trace dropped %zu packets\n",
    159                                 trace_get_dropped_packets(trace_read));
    160                 }
    161         }
    162         if (trace_get_filtered_packets(trace_read) != 0) {
    163                 if (trace_get_filtered_packets(trace_read) == UINT64_MAX) {
    164                         printf("\tInfo: trace does not support filter counter\n");
    165                 } else {
    166                         ERROR("Trace dropped %zu packets\n",
    167                                 trace_get_filtered_packets(trace_read));
    168                 }
    169         }
    170         if (trace_get_received_packets(trace_read) != 100) {
    171                 if (trace_get_received_packets(trace_read) == UINT64_MAX) {
    172                         printf("\tInfo: trace does not support received counter\n");
    173                 } else {
    174                         ERROR("Trace received %zu packets\n",
    175                                 trace_get_received_packets(trace_read));
    176                 }
    177         }
    178         if (trace_get_accepted_packets(trace_read) != (size_t) test_size) {
    179                 // This would more likely be a libtrace issue rather than format specific
    180                 ERROR("Trace only accepted %zu packets\n",
    181                         trace_get_accepted_packets(trace_read));
    182         }
     159        if (!stat->dropped_valid) {
     160                printf("\tInfo: trace does not support drop counter\n");
     161        } else if (stat->dropped != 0) {
     162                ERROR("Trace dropped %zu packets\n", stat->dropped);
     163        }
     164
     165        if (!stat->filtered_valid) {
     166                printf("\tInfo: trace does not support filter counter\n");
     167        } else if (stat->filtered != 0) {
     168                ERROR("Trace filtered %zu packets\n", stat->filtered);
     169        }
     170
     171        if (!stat->received_valid) {
     172                printf("\tInfo: trace does not support received counter\n");
     173        } else if (stat->received != 100) {
     174                ERROR("Trace received %zu/100 packets\n", stat->received);
     175        }
     176
     177        if (!stat->accepted_valid) {
     178                printf("\tInfo: trace does not support accepted counter\n");
     179        } else if (stat->accepted != (uint32_t) test_size) {
     180                ERROR("Trace only accepted %zu/%u packets\n", stat->accepted,
     181                                (uint32_t)test_size);
     182        }
     183
    183184        return err;
    184185}
  • test/test-tracetime-parallel.c

    r6b98325 r8decff7  
    6262
    6363
    64 static int totalpkts = 0;
    65 static int skippedpkts = 0;
    66 static int expected;
    67 
    6864libtrace_t *trace = NULL;
     65int total = 0;
     66
    6967static void signal_handler(int signal)
    7068{
     
    7371
    7472                /* check within 10 seconds we got 9-11 packets */
    75                 assert(check_range_jitter(10.0, (double) totalpkts, 1.0));
     73                assert(check_range_jitter(10.0, (double) total, 1.0));
    7674
    7775                /* Now fullspeed it */
     
    8381}
    8482
    85 static void report_result(libtrace_t *trace UNUSED, int mesg,
    86                           libtrace_generic_t data,
    87                           libtrace_thread_t *sender UNUSED) {
    88 
    89         switch (mesg) {
    90         case MESSAGE_STARTING:
    91                 break;
    92         case MESSAGE_RESULT:
    93                 switch (data.res->type) {
    94                 case RESULT_USER:
    95                         totalpkts++;
    96                         break;
    97                 case RESULT_USER+1:
    98                         skippedpkts++;
    99                         break;
    100                 }
    101                 break;
    102         }
    103 }
    104 
    105 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t,
    106                         int mesg, libtrace_generic_t data,
    107                         libtrace_thread_t *sender UNUSED) {
     83struct counter {
     84        int total;
     85        int skipped;
     86};
     87
     88static void *start_report(libtrace_t *trace UNUSED,
     89                libtrace_thread_t *t UNUSED, void *global UNUSED) {
     90       
     91        struct counter *c = (struct counter *)malloc(sizeof(struct counter));
     92        c->total = 0;
     93        c->skipped = 0;
     94        return c;
     95
     96}
     97
     98static void stop_report(libtrace_t *trace UNUSED,
     99                libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls) {
     100
     101        struct counter *c = (struct counter *)tls;
     102
     103        assert(c->skipped <= 20);
     104        assert(c->skipped + c->total == 100);
     105
     106        free(c);
     107}
     108
     109static void report_cb(libtrace_t *trace UNUSED,
     110                libtrace_thread_t *sender UNUSED, void *global UNUSED,
     111                void *tls, libtrace_result_t *result) {
     112
     113        struct counter *c = (struct counter *)tls;
     114        if (result->type == RESULT_USER)
     115                c->total ++;
     116        if (result->type == RESULT_USER + 1)
     117                c->skipped ++;
     118
     119        total = c->total;
     120
     121}
     122
     123static void *start_process(libtrace_t *trace UNUSED,
     124                libtrace_thread_t *t UNUSED, void *global UNUSED) {
     125
     126        bool *accepting = (bool *)malloc(sizeof(bool));
     127        *accepting = true;
     128        return accepting;
     129
     130}
     131
     132static void stop_process(libtrace_t *trace UNUSED,
     133                libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls) {
     134        bool *accepting = (bool *)tls;
     135        free(accepting);
     136}
     137
     138static void pause_process(libtrace_t *trace UNUSED,
     139                libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls) {
     140        bool *accepting = (bool *)tls;
     141        *accepting = false;
     142}
     143
     144static void resume_process(libtrace_t *trace UNUSED,
     145                libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls) {
     146        bool *accepting = (bool *)tls;
     147        *accepting = true;
     148}
     149
     150static void user_message(libtrace_t *trace UNUSED,
     151                libtrace_thread_t *t UNUSED, void *global UNUSED,
     152                void *tls UNUSED, int msg UNUSED, libtrace_generic_t ts) {
     153
     154        struct timeval tv;
     155        double time;
     156
     157        gettimeofday(&tv, NULL);
     158        time = timeval_to_seconds(tv);
     159
     160        assert(check_range_jitter(ts.rdouble, time, 0.01));
     161}
     162
     163static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *t,
     164                void *global UNUSED, void *tls, libtrace_packet_t *packet) {
     165
    108166        struct timeval tv;
    109167        double time;
    110168        libtrace_message_t message;
    111         static __thread bool accepting = true;
     169        bool *accepting = (bool *)tls;
    112170
    113171        gettimeofday(&tv, NULL);
    114172        time = timeval_to_seconds(tv);
    115173
    116         switch (mesg) {
    117         case MESSAGE_PACKET:
    118                 /* In order to instantly pause a trace we don't delay any buffered packets
    119                  * These are sent after MESSAGE_PAUSING has been received */
    120                 if (accepting) {
    121                         fprintf(stderr, ".");
    122                         trace_publish_result(trace, t, (uint64_t) time, (libtrace_generic_t){.rdouble = time}, RESULT_USER);
    123 
    124                         /* Check that we are not blocking regular message delivery */
    125                         message.code = MESSAGE_USER;
    126                         message.sender = t;
    127                         message.data.rdouble = time;
    128                         trace_message_perpkts(trace, &message);
    129                 } else {
    130                         trace_publish_result(trace, t, (uint64_t) time, (libtrace_generic_t){.rdouble = time}, RESULT_USER+1);
    131                 }
    132                 return data.pkt;
    133         case MESSAGE_USER:
    134                 assert (check_range_jitter(data.rdouble, time, 0.01));
    135                 break;
    136         case MESSAGE_RESUMING:
    137                 accepting = true;
    138                 break;
    139         case MESSAGE_PAUSING:
    140                 accepting = false;
    141                 break;
    142         }
    143         return NULL;
    144 }
    145 
     174        if (*accepting) {
     175                fprintf(stderr, ".");
     176                trace_publish_result(trace, t, (uint64_t)time,
     177                                (libtrace_generic_t){.rdouble = time},
     178                                RESULT_USER);
     179                /* Test that we are not interfering with message delivery */
     180                message.code = MESSAGE_USER;
     181                message.sender = t;
     182                message.data.rdouble = time;
     183                trace_message_perpkts(trace, &message);
     184        } else {
     185                trace_publish_result(trace, t, (uint64_t)time,
     186                                (libtrace_generic_t){.rdouble = time},
     187                                RESULT_USER+1);
     188
     189        }
     190        return packet;
     191
     192}
    146193/**
    147194 * Test that tracetime playback functions.
     
    156203        struct timeval tv;
    157204        double start, end;
    158         gettimeofday(&tv, NULL);
     205        libtrace_callback_set_t *processing;
     206        libtrace_callback_set_t *reporter;
     207
     208        gettimeofday(&tv, NULL);
    159209        start = timeval_to_seconds(tv);
    160210        printf("Testing delay\n");
     
    168218        trace_set_tracetime(trace, true);
    169219
     220        processing = trace_create_callback_set();
     221        trace_set_starting_cb(processing, start_process);
     222        trace_set_stopping_cb(processing, stop_process);
     223        trace_set_pausing_cb(processing, pause_process);
     224        trace_set_resuming_cb(processing, resume_process);
     225        trace_set_packet_cb(processing, per_packet);
     226        trace_set_user_message_cb(processing, user_message);
     227
     228        reporter = trace_create_callback_set();
     229        trace_set_starting_cb(reporter, start_report);
     230        trace_set_stopping_cb(reporter, stop_report);
     231        trace_set_result_cb(reporter, report_cb);
     232
     233        trace_set_reporter_thold(trace, 1);
     234
    170235        // Start it
    171         trace_pstart(trace, NULL, per_packet, report_result);
     236        trace_pstart(trace, NULL, processing, reporter);
    172237        iferr(trace,tracename);
    173238        fprintf(stderr, "Running in tracetime (Will take about 10 seconds)\t");
     
    179244        trace_join(trace);
    180245
    181         /* Now check we have all received all the packets */
    182         assert(skippedpkts <= 20); /* Note this is hard coded to the default burst_sizeX2 */
    183         if (error == 0) {
    184                 if (totalpkts + skippedpkts == expected) {
    185                         printf("success: %d packets read\n",expected);
    186                 } else {
    187                         printf("failure: %d packets expected, %d seen\n",expected,totalpkts);
    188                         error = 1;
    189                 }
    190         } else {
     246        if (error != 0) {
    191247                iferr(trace,tracename);
    192248        }
     
    202258        int error = 0;
    203259        const char *tracename;
    204         expected = 100;
    205260
    206261        signal(SIGALRM, signal_handler);
     
    209264
    210265        error = test_tracetime(tracename);
    211 
     266        fprintf(stderr, "\n");
    212267        return error;
    213268}
Note: See TracChangeset for help on using the changeset viewer.