Changeset 8decff7
- Timestamp:
- 09/16/15 13:19:13 (5 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 1101175
- Parents:
- 5478d3d
- Location:
- test
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
test/test-format-parallel-hasher.c
r6b98325 r8decff7 94 94 bool seen_start_message; 95 95 bool seen_stop_message; 96 bool seen_resum ed_message;96 bool seen_resuming_message; 97 97 bool seen_pausing_message; 98 98 int count; 99 99 }; 100 100 101 static int totalpkts = 0; 102 static int expected; 103 static void report_result(libtrace_t *trace UNUSED, int mesg, 104 libtrace_generic_t data, 105 libtrace_thread_t *sender UNUSED) { 106 static int totalthreads = 0; 107 switch (mesg) { 108 case MESSAGE_RESULT: 109 assert(data.res->key == 0); 110 printf("%d,", data.res->value.sint); 111 totalthreads++; 112 totalpkts += data.res->value.sint; 113 assert(data.res->value.sint == 25 || 114 data.res->value.sint == expected - 25); 115 break; 116 case MESSAGE_STARTING: 117 // Should have two threads here 118 assert(libtrace_get_perpkt_count(trace) == 2); 119 printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace)); 120 break; 121 case MESSAGE_STOPPING: 122 printf(")\n"); 123 assert(totalthreads == libtrace_get_perpkt_count(trace)); 124 break; 101 struct final { 102 int threads; 103 int packets; 104 }; 105 106 static void *report_start(libtrace_t *trace UNUSED, 107 libtrace_thread_t *t UNUSED, 108 void *global) { 109 uint32_t *magic = (uint32_t *)global; 110 struct final *threadcounter = 111 (struct final *)malloc(sizeof(struct final)); 112 113 assert(*magic == 0xabcdef); 114 115 threadcounter->threads = 0; 116 threadcounter->packets = 0; 117 return threadcounter; 118 } 119 120 static void report_cb(libtrace_t *trace UNUSED, 121 libtrace_thread_t *sender UNUSED, 122 void *global, void *tls, libtrace_result_t *res) { 123 124 uint32_t *magic = (uint32_t *)global; 125 struct final *threadcounter = (struct final *)tls; 126 127 assert(*magic == 0xabcdef); 128 assert(res->key == 0); 129 130 threadcounter->threads ++; 131 threadcounter->packets += res->value.sint; 132 133 assert(res->value.sint == 25 || res->value.sint == 75); 134 printf("%d\n", res->value.sint); 135 } 136 137 static void report_end(libtrace_t *trace, libtrace_thread_t *t UNUSED, 138 void *global, void *tls) { 139 140 uint32_t *magic = (uint32_t *)global; 141 struct final *threadcounter = (struct final *)tls; 142 143 assert(*magic == 0xabcdef); 144 assert(threadcounter->threads == trace_get_perpkt_threads(trace)); 145 assert(threadcounter->packets == 100); 146 147 free(threadcounter); 148 } 149 150 static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED, 151 libtrace_thread_t *t UNUSED, 152 void *global, void *tls, libtrace_packet_t *packet) { 153 struct TLS *storage = (struct TLS *)tls; 154 uint32_t *magic = (uint32_t *)global; 155 static __thread int count = 0; 156 int a,*b,c=0; 157 158 assert(storage != NULL); 159 assert(!storage->seen_stop_message); 160 161 if (storage->seen_pausing_message) 162 assert(storage->seen_resuming_message); 163 164 assert(*magic == 0xabcdef); 165 166 storage->count ++; 167 count ++; 168 169 assert(count == storage->count); 170 171 if (count > 100) { 172 fprintf(stderr, "Too many packets -- someone should stop me!\n"); 173 kill(getpid(), SIGTERM); 174 } 175 176 // Do some work to even out the load on cores 177 b = &c; 178 for (a = 0; a < 10000000; a++) { 179 c += a**b; 180 } 181 182 return packet; 183 } 184 185 static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED, 186 void *global) { 187 188 static __thread bool seen_start_message = false; 189 uint32_t *magic = (uint32_t *)global; 190 struct TLS *storage = NULL; 191 assert(*magic == 0xabcdef); 192 193 assert(!seen_start_message); 194 assert(trace); 195 196 storage = (struct TLS *)malloc(sizeof(struct TLS)); 197 storage->seen_start_message = true; 198 storage->seen_stop_message = false; 199 storage->seen_resuming_message = false; 200 storage->seen_pausing_message = false; 201 storage->count = 0; 202 203 seen_start_message = true; 204 205 return storage; 206 } 207 208 static void stop_processing(libtrace_t *trace, libtrace_thread_t *t, 209 void *global, void *tls) { 210 211 static __thread bool seen_stop_message = false; 212 struct TLS *storage = (struct TLS *)tls; 213 uint32_t *magic = (uint32_t *)global; 214 215 assert(storage != NULL); 216 assert(!storage->seen_stop_message); 217 assert(!seen_stop_message); 218 assert(storage->seen_start_message); 219 assert(*magic == 0xabcdef); 220 221 seen_stop_message = true; 222 storage->seen_stop_message = true; 223 224 assert(storage->count == 25 || storage->count == 75); 225 226 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = storage->count}, RESULT_USER); 227 trace_post_reporter(trace); 228 free(storage); 229 } 230 231 static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, 232 void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) { 233 234 fprintf(stderr, "Not expecting a tick packet\n"); 235 kill(getpid(), SIGTERM); 236 } 237 238 static void pause_processing(libtrace_t *trace UNUSED, 239 libtrace_thread_t *t UNUSED, 240 void *global, void *tls) { 241 242 static __thread bool seen_pause_message = false; 243 struct TLS *storage = (struct TLS *)tls; 244 uint32_t *magic = (uint32_t *)global; 245 246 assert(storage != NULL); 247 assert(!storage->seen_stop_message); 248 assert(storage->seen_start_message); 249 assert(*magic == 0xabcdef); 250 251 assert(seen_pause_message == storage->seen_pausing_message); 252 253 seen_pause_message = true; 254 storage->seen_pausing_message = true; 255 } 256 257 static void resume_processing(libtrace_t *trace UNUSED, 258 libtrace_thread_t *t UNUSED, 259 void *global, void *tls) { 260 261 static __thread bool seen_resume_message = false; 262 struct TLS *storage = (struct TLS *)tls; 263 uint32_t *magic = (uint32_t *)global; 264 265 assert(storage != NULL); 266 assert(!storage->seen_stop_message); 267 assert(storage->seen_start_message); 268 assert(*magic == 0xabcdef); 269 270 assert(seen_resume_message == storage->seen_resuming_message); 271 272 seen_resume_message = true; 273 storage->seen_resuming_message = true; 274 } 275 276 uint64_t custom_hash(const libtrace_packet_t *packet UNUSED, void *data) { 277 int *count = (int *)data; 278 *count += 1; 279 280 /* Just throw the first 25 packets to thread 0 and the rest to thread 281 * 1. 282 */ 283 if (*count <= 25) 284 return 0; 285 return 1; 286 } 287 288 int main(int argc, char *argv[]) { 289 int error = 0; 290 const char *tracename; 291 libtrace_t *trace; 292 libtrace_callback_set_t *processing = NULL; 293 libtrace_callback_set_t *reporter = NULL; 294 uint32_t global = 0xabcdef; 295 int hashercount = 0; 296 297 if (argc<2) { 298 fprintf(stderr,"usage: %s type\n",argv[0]); 299 return 1; 125 300 } 126 } 127 128 static int x; 129 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t, 130 int mesg, libtrace_generic_t data, 131 libtrace_thread_t *sender UNUSED) { 132 struct TLS *tls; 133 void* ret; 134 tls = trace_get_tls(t); 135 int a,*b,c=0; 136 137 switch (mesg) { 138 case MESSAGE_PACKET: 139 assert(tls != NULL); 140 assert(!(tls->seen_stop_message)); 141 tls->count++; 142 if (tls->count>100) { 143 fprintf(stderr, "Too many packets someone should stop me!!\n"); 144 kill(getpid(), SIGTERM); 145 } 146 // Do some work to even out the load on cores 147 b = &c; 148 for (a = 0; a < 10000000; a++) { 149 c += a**b; 150 } 151 x = c; 152 return data.pkt; 153 case MESSAGE_STARTING: 154 assert(tls == NULL); 155 tls = calloc(sizeof(struct TLS), 1); 156 ret = trace_set_tls(t, tls); 157 assert(ret == NULL); 158 tls->seen_start_message = true; 159 break; 160 case MESSAGE_STOPPING: 161 assert(tls->seen_start_message); 162 assert(tls != NULL); 163 tls->seen_stop_message = true; 164 trace_set_tls(t, NULL); 165 166 // All threads publish to verify the thread count 167 assert(tls->count == 25 || tls->count == 75); 168 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_USER); 169 trace_post_reporter(trace); 170 free(tls); 171 break; 172 case MESSAGE_TICK_INTERVAL: 173 case MESSAGE_TICK_COUNT: 174 assert(tls->seen_start_message ); 175 fprintf(stderr, "Not expecting a tick packet\n"); 176 kill(getpid(), SIGTERM); 177 break; 178 case MESSAGE_PAUSING: 179 assert(tls->seen_start_message); 180 tls->seen_pausing_message = true; 181 break; 182 case MESSAGE_RESUMING: 183 assert(tls->seen_pausing_message || tls->seen_start_message); 184 tls->seen_resumed_message = true; 185 break; 186 } 187 return NULL; 188 } 189 190 191 /** 192 * Sends the first 25 packets to thread 0, the next 75 to thread 1 193 * This is based on a few internal workings assumptions, which 194 * might change and still be valid even if this test fails!!. 195 */ 196 uint64_t hash25_75(const libtrace_packet_t* packet UNUSED, void *data) { 197 int *count = (int *) data; 198 *count += 1; 199 if (*count <= 25) 200 return 0; 201 return 1; 202 } 203 204 /** 205 * Test that the hasher function works 206 */ 207 int test_hasher(const char *tracename) { 208 libtrace_t *trace; 209 int error = 0; 210 int hashercount = 0; 211 printf("Testing hasher function\n"); 212 213 // Create the trace 301 302 tracename = lookup_uri(argv[1]); 303 214 304 trace = trace_create(tracename); 215 305 iferr(trace,tracename); 216 306 217 // Always use 2 threads for simplicity 218 trace_set_perpkt_threads(trace, 2); 219 trace_set_hasher(trace, HASHER_CUSTOM, &hash25_75, &hashercount); 220 221 // Start it 222 trace_pstart(trace, NULL, per_packet, report_result); 307 processing = trace_create_callback_set(); 308 trace_set_starting_cb(processing, start_processing); 309 trace_set_stopping_cb(processing, stop_processing); 310 trace_set_packet_cb(processing, per_packet); 311 trace_set_pausing_cb(processing, pause_processing); 312 trace_set_resuming_cb(processing, resume_processing); 313 trace_set_tick_count_cb(processing, process_tick); 314 trace_set_tick_interval_cb(processing, process_tick); 315 316 reporter = trace_create_callback_set(); 317 trace_set_starting_cb(reporter, report_start); 318 trace_set_stopping_cb(reporter, report_end); 319 trace_set_result_cb(reporter, report_cb); 320 321 322 /* Set up our hasher and our two threads */ 323 trace_set_perpkt_threads(trace, 2); 324 trace_set_hasher(trace, HASHER_CUSTOM, &custom_hash, &hashercount); 325 326 trace_pstart(trace, &global, processing, reporter); 223 327 iferr(trace,tracename); 224 /* Make sure traces survive a pause and restart */ 328 329 /* Make sure traces survive a pause */ 225 330 trace_ppause(trace); 226 331 iferr(trace,tracename); … … 231 336 trace_join(trace); 232 337 338 global = 0xffffffff; 339 233 340 /* Now check we have all received all the packets */ 234 if (error == 0) { 235 if (totalpkts == expected) { 236 printf("success: %d packets read\n",expected); 237 } else { 238 printf("failure: %d packets expected, %d seen\n",expected,totalpkts); 239 error = 1; 240 } 241 } else { 341 if (error != 0) { 242 342 iferr(trace,tracename); 243 343 } 244 trace_destroy(trace); 245 return error; 246 } 247 248 249 250 int main(int argc, char *argv[]) { 251 int error = 0; 252 const char *tracename; 253 expected = 100; 254 255 if (argc<2) { 256 fprintf(stderr,"usage: %s type\n",argv[0]); 257 return 1; 258 } 259 260 tracename = lookup_uri(argv[1]); 261 262 if (strcmp(argv[1],"rtclient")==0) expected=101; 263 264 error = test_hasher(tracename); 265 266 return error; 267 } 344 345 trace_destroy(trace); 346 trace_destroy_callback_set(processing); 347 trace_destroy_callback_set(reporter); 348 return error; 349 } -
test/test-format-parallel-reporter.c
r7c95027 r8decff7 50 50 #include "dagformat.h" 51 51 #include "libtrace_parallel.h" 52 #include "data-struct/vector.h" 52 53 53 54 void iferr(libtrace_t *trace,const char *msg) … … 90 91 } 91 92 92 int globalcount = 0; 93 94 static void reporter(libtrace_t *libtrace UNUSED, int mesg, 95 libtrace_generic_t data, 96 libtrace_thread_t *sender UNUSED) { 97 static uint64_t last = -1; 98 static int pktcount = 0; 99 libtrace_packet_t *packet; 100 switch (mesg) { 101 case MESSAGE_RESULT: 102 packet = data.res->value.pkt; 103 assert(data.res->key == trace_packet_get_order(packet)); 104 if(last == (uint64_t)-1) { 105 last = data.res->key; 106 } else { 107 assert (last < data.res->key); 108 last = data.res->key; 109 } 110 pktcount++; 111 trace_free_packet(libtrace, packet); 112 break; 113 case MESSAGE_STOPPING: 114 globalcount = pktcount; 115 break; 116 } 117 } 118 119 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t, 120 int mesg, libtrace_generic_t data, 121 libtrace_thread_t *sender UNUSED) { 122 UNUSED static __thread int x = 0; 123 124 if (mesg == MESSAGE_PACKET) { 125 int a,*b,c=0; 126 // Do some work to even out the load on cores 127 b = &c; 128 for (a = 0; a < 10000000; a++) { 129 c += a**b; 130 } 131 x = c; 132 trace_publish_result(trace, t, trace_packet_get_order(data.pkt), (libtrace_generic_t){.pkt=data.pkt}, RESULT_PACKET); 133 } 134 return NULL; 93 struct TLS { 94 bool seen_start_message; 95 bool seen_stop_message; 96 bool seen_resuming_message; 97 bool seen_pausing_message; 98 int count; 99 }; 100 101 struct final { 102 uint64_t last; 103 int packets; 104 }; 105 106 static void *report_start(libtrace_t *trace UNUSED, 107 libtrace_thread_t *t UNUSED, 108 void *global) { 109 uint32_t *magic = (uint32_t *)global; 110 struct final *threadcounter = 111 (struct final *)malloc(sizeof(struct final)); 112 113 assert(*magic == 0xabcdef); 114 115 threadcounter->last = 0; 116 threadcounter->packets = 0; 117 return threadcounter; 118 } 119 120 static void report_cb(libtrace_t *trace UNUSED, 121 libtrace_thread_t *sender UNUSED, 122 void *global, void *tls, libtrace_result_t *res) { 123 124 uint32_t *magic = (uint32_t *)global; 125 struct final *threadcounter = (struct final *)tls; 126 127 assert(*magic == 0xabcdef); 128 assert(res->type == RESULT_PACKET); 129 130 if (threadcounter->last != 0) 131 assert(threadcounter->last + 1 == res->key); 132 threadcounter->last = res->key; 133 134 threadcounter->packets += 1; 135 trace_free_packet(trace, res->value.pkt); 136 } 137 138 static void report_end(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, 139 void *global, void *tls) { 140 141 uint32_t *magic = (uint32_t *)global; 142 struct final *threadcounter = (struct final *)tls; 143 144 assert(*magic == 0xabcdef); 145 assert(threadcounter->packets == 100); 146 147 free(threadcounter); 148 } 149 150 static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED, 151 libtrace_thread_t *t UNUSED, 152 void *global, void *tls, libtrace_packet_t *packet) { 153 struct TLS *storage = (struct TLS *)tls; 154 uint32_t *magic = (uint32_t *)global; 155 static __thread int count = 0; 156 int a,*b,c=0; 157 158 assert(storage != NULL); 159 assert(!storage->seen_stop_message); 160 161 if (storage->seen_pausing_message) 162 assert(storage->seen_resuming_message); 163 164 assert(*magic == 0xabcdef); 165 166 storage->count ++; 167 count ++; 168 169 assert(count == storage->count); 170 171 if (count > 100) { 172 fprintf(stderr, "Too many packets -- someone should stop me!\n"); 173 kill(getpid(), SIGTERM); 174 } 175 176 // Do some work to even out the load on cores 177 b = &c; 178 for (a = 0; a < 10000000; a++) { 179 c += a**b; 180 } 181 182 trace_publish_result(trace, t, trace_packet_get_order(packet), 183 (libtrace_generic_t){.pkt=packet}, RESULT_PACKET); 184 185 return NULL; 186 } 187 188 static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED, 189 void *global) { 190 191 static __thread bool seen_start_message = false; 192 uint32_t *magic = (uint32_t *)global; 193 struct TLS *storage = NULL; 194 assert(*magic == 0xabcdef); 195 196 assert(!seen_start_message); 197 assert(trace); 198 199 storage = (struct TLS *)malloc(sizeof(struct TLS)); 200 storage->seen_start_message = true; 201 storage->seen_stop_message = false; 202 storage->seen_resuming_message = false; 203 storage->seen_pausing_message = false; 204 storage->count = 0; 205 206 seen_start_message = true; 207 208 return storage; 209 } 210 211 static void stop_processing(libtrace_t *trace UNUSED, 212 libtrace_thread_t *t UNUSED, 213 void *global, void *tls) { 214 215 static __thread bool seen_stop_message = false; 216 struct TLS *storage = (struct TLS *)tls; 217 uint32_t *magic = (uint32_t *)global; 218 219 assert(storage != NULL); 220 assert(!storage->seen_stop_message); 221 assert(!seen_stop_message); 222 assert(storage->seen_start_message); 223 assert(*magic == 0xabcdef); 224 225 seen_stop_message = true; 226 storage->seen_stop_message = true; 227 free(storage); 228 } 229 230 static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, 231 void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) { 232 233 fprintf(stderr, "Not expecting a tick packet\n"); 234 kill(getpid(), SIGTERM); 235 } 236 237 static void pause_processing(libtrace_t *trace UNUSED, 238 libtrace_thread_t *t UNUSED, 239 void *global, void *tls) { 240 241 static __thread bool seen_pause_message = false; 242 struct TLS *storage = (struct TLS *)tls; 243 uint32_t *magic = (uint32_t *)global; 244 245 assert(storage != NULL); 246 assert(!storage->seen_stop_message); 247 assert(storage->seen_start_message); 248 assert(*magic == 0xabcdef); 249 250 assert(seen_pause_message == storage->seen_pausing_message); 251 252 seen_pause_message = true; 253 storage->seen_pausing_message = true; 254 } 255 256 static void resume_processing(libtrace_t *trace UNUSED, 257 libtrace_thread_t *t UNUSED, 258 void *global, void *tls) { 259 260 static __thread bool seen_resume_message = false; 261 struct TLS *storage = (struct TLS *)tls; 262 uint32_t *magic = (uint32_t *)global; 263 264 assert(storage != NULL); 265 assert(!storage->seen_stop_message); 266 assert(storage->seen_start_message); 267 assert(*magic == 0xabcdef); 268 269 assert(seen_resume_message == storage->seen_resuming_message); 270 271 seen_resume_message = true; 272 storage->seen_resuming_message = true; 135 273 } 136 274 137 275 int main(int argc, char *argv[]) { 138 276 int error = 0; 139 int expected = 100;140 277 const char *tracename; 141 278 libtrace_t *trace; 279 libtrace_callback_set_t *processing = NULL; 280 libtrace_callback_set_t *reporter = NULL; 281 uint32_t global = 0xabcdef; 142 282 143 283 if (argc<2) { … … 151 291 iferr(trace,tracename); 152 292 153 if (strcmp(argv[1],"rtclient")==0) expected=101; 154 155 trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0}); 156 157 trace_pstart(trace, NULL, per_packet, reporter); 293 processing = trace_create_callback_set(); 294 trace_set_starting_cb(processing, start_processing); 295 trace_set_stopping_cb(processing, stop_processing); 296 trace_set_packet_cb(processing, per_packet); 297 trace_set_pausing_cb(processing, pause_processing); 298 trace_set_resuming_cb(processing, resume_processing); 299 trace_set_tick_count_cb(processing, process_tick); 300 trace_set_tick_interval_cb(processing, process_tick); 301 302 reporter = trace_create_callback_set(); 303 trace_set_starting_cb(reporter, report_start); 304 trace_set_stopping_cb(reporter, report_end); 305 trace_set_result_cb(reporter, report_cb); 306 307 308 /* Test ordered combiner */ 309 trace_set_perpkt_threads(trace, 4); 310 trace_set_combiner(trace, &combiner_ordered, (libtrace_generic_t){0}); 311 312 trace_pstart(trace, &global, processing, reporter); 158 313 iferr(trace,tracename); 159 314 … … 167 322 trace_join(trace); 168 323 169 if (error == 0) { 170 if (globalcount == expected) { 171 printf("success: %d packets read\n",expected); 172 } else { 173 printf("failure: %d packets expected, %d seen\n",expected,globalcount); 174 error = 1; 175 } 176 } else { 324 global = 0xffffffff; 325 326 /* Now check we have all received all the packets */ 327 if (error != 0) { 177 328 iferr(trace,tracename); 178 329 } 179 trace_destroy(trace); 180 return error; 181 } 330 331 trace_destroy(trace); 332 trace_destroy_callback_set(processing); 333 trace_destroy_callback_set(reporter); 334 return error; 335 } -
test/test-format-parallel-singlethreaded-hasher.c
r6b98325 r8decff7 91 91 } 92 92 93 94 93 struct TLS { 95 94 bool seen_start_message; … … 100 99 }; 101 100 102 static int totalpkts = 0; 103 static int expected; 104 static void report_result(libtrace_t *trace UNUSED, int mesg, 105 libtrace_generic_t data, 106 libtrace_thread_t *sender UNUSED) { 107 static int totalthreads = 0; 108 switch (mesg) { 109 case MESSAGE_RESULT: 110 assert(data.res->key == 0); 111 printf("%d,", data.res->value.sint); 112 totalthreads++; 113 totalpkts += data.res->value.sint; 114 break; 115 case MESSAGE_STARTING: 116 // Should have a single thread here 117 assert(libtrace_get_perpkt_count(trace) == 1); 118 printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace)); 119 break; 120 case MESSAGE_STOPPING: 121 printf(")\n"); 122 assert(totalthreads == libtrace_get_perpkt_count(trace)); 123 break; 101 struct final { 102 int threads; 103 int packets; 104 }; 105 106 static void *report_start(libtrace_t *trace UNUSED, 107 libtrace_thread_t *t UNUSED, 108 void *global) { 109 uint32_t *magic = (uint32_t *)global; 110 struct final *threadcounter = 111 (struct final *)malloc(sizeof(struct final)); 112 113 assert(*magic == 0xabcdef); 114 115 threadcounter->threads = 0; 116 threadcounter->packets = 0; 117 return threadcounter; 118 } 119 120 static void report_cb(libtrace_t *trace UNUSED, 121 libtrace_thread_t *sender UNUSED, 122 void *global, void *tls, libtrace_result_t *res) { 123 124 uint32_t *magic = (uint32_t *)global; 125 struct final *threadcounter = (struct final *)tls; 126 127 assert(*magic == 0xabcdef); 128 assert(res->key == 0); 129 130 threadcounter->threads ++; 131 threadcounter->packets += res->value.sint; 132 133 assert(res->value.sint == 100); 134 printf("%d\n", res->value.sint); 135 } 136 137 static void report_end(libtrace_t *trace, libtrace_thread_t *t UNUSED, 138 void *global, void *tls) { 139 140 uint32_t *magic = (uint32_t *)global; 141 struct final *threadcounter = (struct final *)tls; 142 143 assert(*magic == 0xabcdef); 144 assert(threadcounter->threads == trace_get_perpkt_threads(trace)); 145 assert(threadcounter->packets == 100); 146 147 free(threadcounter); 148 } 149 150 static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED, 151 libtrace_thread_t *t UNUSED, 152 void *global, void *tls, libtrace_packet_t *packet) { 153 struct TLS *storage = (struct TLS *)tls; 154 uint32_t *magic = (uint32_t *)global; 155 static __thread int count = 0; 156 int a,*b,c=0; 157 158 assert(storage != NULL); 159 assert(!storage->seen_stop_message); 160 161 if (storage->seen_pausing_message) 162 assert(storage->seen_resuming_message); 163 164 assert(*magic == 0xabcdef); 165 166 storage->count ++; 167 count ++; 168 169 assert(count == storage->count); 170 171 if (count > 100) { 172 fprintf(stderr, "Too many packets -- someone should stop me!\n"); 173 kill(getpid(), SIGTERM); 174 } 175 176 // Do some work to even out the load on cores 177 b = &c; 178 for (a = 0; a < 10000000; a++) { 179 c += a**b; 180 } 181 182 return packet; 183 } 184 185 static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED, 186 void *global) { 187 188 static __thread bool seen_start_message = false; 189 uint32_t *magic = (uint32_t *)global; 190 struct TLS *storage = NULL; 191 assert(*magic == 0xabcdef); 192 193 assert(!seen_start_message); 194 assert(trace); 195 196 storage = (struct TLS *)malloc(sizeof(struct TLS)); 197 storage->seen_start_message = true; 198 storage->seen_stop_message = false; 199 storage->seen_resuming_message = false; 200 storage->seen_pausing_message = false; 201 storage->count = 0; 202 203 seen_start_message = true; 204 205 return storage; 206 } 207 208 static void stop_processing(libtrace_t *trace, libtrace_thread_t *t, 209 void *global, void *tls) { 210 211 static __thread bool seen_stop_message = false; 212 struct TLS *storage = (struct TLS *)tls; 213 uint32_t *magic = (uint32_t *)global; 214 215 assert(storage != NULL); 216 assert(!storage->seen_stop_message); 217 assert(!seen_stop_message); 218 assert(storage->seen_start_message); 219 assert(*magic == 0xabcdef); 220 221 seen_stop_message = true; 222 storage->seen_stop_message = true; 223 224 assert(storage->count == 100); 225 226 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = storage->count}, RESULT_USER); 227 trace_post_reporter(trace); 228 free(storage); 229 } 230 231 static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, 232 void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) { 233 234 fprintf(stderr, "Not expecting a tick packet\n"); 235 kill(getpid(), SIGTERM); 236 } 237 238 static void pause_processing(libtrace_t *trace UNUSED, 239 libtrace_thread_t *t UNUSED, 240 void *global, void *tls) { 241 242 static __thread bool seen_pause_message = false; 243 struct TLS *storage = (struct TLS *)tls; 244 uint32_t *magic = (uint32_t *)global; 245 246 assert(storage != NULL); 247 assert(!storage->seen_stop_message); 248 assert(storage->seen_start_message); 249 assert(*magic == 0xabcdef); 250 251 assert(seen_pause_message == storage->seen_pausing_message); 252 253 seen_pause_message = true; 254 storage->seen_pausing_message = true; 255 } 256 257 static void resume_processing(libtrace_t *trace UNUSED, 258 libtrace_thread_t *t UNUSED, 259 void *global, void *tls) { 260 261 static __thread bool seen_resume_message = false; 262 struct TLS *storage = (struct TLS *)tls; 263 uint32_t *magic = (uint32_t *)global; 264 265 assert(storage != NULL); 266 assert(!storage->seen_stop_message); 267 assert(storage->seen_start_message); 268 assert(*magic == 0xabcdef); 269 270 assert(seen_resume_message == storage->seen_resuming_message); 271 272 seen_resume_message = true; 273 storage->seen_resuming_message = true; 274 } 275 276 uint64_t custom_hash(const libtrace_packet_t *packet UNUSED, void *data) { 277 int *count = (int *)data; 278 *count += 1; 279 280 /* Just throw the first 25 packets to thread 0 and the rest to thread 281 * 1. 282 */ 283 if (*count <= 25) 284 return 0; 285 return 1; 286 } 287 288 int main(int argc, char *argv[]) { 289 int error = 0; 290 const char *tracename; 291 libtrace_t *trace; 292 libtrace_callback_set_t *processing = NULL; 293 libtrace_callback_set_t *reporter = NULL; 294 uint32_t global = 0xabcdef; 295 int hashercount = 0; 296 297 if (argc<2) { 298 fprintf(stderr,"usage: %s type\n",argv[0]); 299 return 1; 124 300 } 125 } 126 127 static int x; 128 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t, 129 int mesg, libtrace_generic_t data, 130 libtrace_thread_t *sender UNUSED) { 131 struct TLS *tls; 132 void* ret; 133 int a,*b,c=0; 134 tls = trace_get_tls(t); 135 136 switch (mesg) { 137 case MESSAGE_PACKET: 138 assert(tls != NULL); 139 assert(!(tls->seen_stop_message)); 140 tls->count++; 141 if (tls->count>100) { 142 fprintf(stderr, "Too many packets someone should stop me!!\n"); 143 kill(getpid(), SIGTERM); 144 } 145 // Do some work to even out the load on cores 146 b = &c; 147 for (a = 0; a < 10000000; a++) { 148 c += a**b; 149 } 150 x = c; 151 return data.pkt; 152 case MESSAGE_STARTING: 153 assert(tls == NULL); 154 tls = calloc(sizeof(struct TLS), 1); 155 ret = trace_set_tls(t, tls); 156 assert(ret == NULL); 157 tls->seen_start_message = true; 158 break; 159 case MESSAGE_STOPPING: 160 assert(tls->seen_start_message); 161 assert(tls != NULL); 162 tls->seen_stop_message = true; 163 trace_set_tls(t, NULL); 164 165 // All threads publish to verify the thread count 166 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_USER); 167 trace_post_reporter(trace); 168 free(tls); 169 break; 170 case MESSAGE_TICK_INTERVAL: 171 case MESSAGE_TICK_COUNT: 172 assert(tls->seen_start_message ); 173 fprintf(stderr, "Not expecting a tick packet\n"); 174 kill(getpid(), SIGTERM); 175 break; 176 case MESSAGE_PAUSING: 177 assert(tls->seen_start_message); 178 tls->seen_pausing_message = true; 179 break; 180 case MESSAGE_RESUMING: 181 assert(tls->seen_pausing_message || tls->seen_start_message); 182 tls->seen_resuming_message = true; 183 break; 184 } 185 return NULL; 186 } 187 188 /** 189 * Sends the first 25 packets to thread 0, the next 75 to thread 1 190 * This is based on a few internal workings assumptions, which 191 * might change and still be valid even if this test fails!!. 192 */ 193 uint64_t hash25_75(const libtrace_packet_t* packet UNUSED, void *data) { 194 int *count = (int *) data; 195 *count += 1; 196 if (*count <= 25) 197 return 0; 198 return 1; 199 } 200 201 /** 202 * Test that the hasher function works in single threaded mode 203 * It might not be called but this ensures consistency 204 */ 205 int test_hasher_singlethreaded(const char *tracename) { 206 libtrace_t *trace; 207 int error = 0; 208 int hashercount = 0; 209 printf("Testing hasher singlethreaded function\n"); 210 211 // Create the trace 301 302 tracename = lookup_uri(argv[1]); 303 212 304 trace = trace_create(tracename); 213 305 iferr(trace,tracename); 214 306 215 // Use 1 thread with a hasher 216 trace_set_perpkt_threads(trace, 1); 217 trace_set_hasher(trace, HASHER_CUSTOM, &hash25_75, &hashercount); 218 219 // Start it 220 trace_pstart(trace, NULL, per_packet, report_result); 307 processing = trace_create_callback_set(); 308 trace_set_starting_cb(processing, start_processing); 309 trace_set_stopping_cb(processing, stop_processing); 310 trace_set_packet_cb(processing, per_packet); 311 trace_set_pausing_cb(processing, pause_processing); 312 trace_set_resuming_cb(processing, resume_processing); 313 trace_set_tick_count_cb(processing, process_tick); 314 trace_set_tick_interval_cb(processing, process_tick); 315 316 reporter = trace_create_callback_set(); 317 trace_set_starting_cb(reporter, report_start); 318 trace_set_stopping_cb(reporter, report_end); 319 trace_set_result_cb(reporter, report_cb); 320 321 322 /* Set up our hasher and our single thread */ 323 trace_set_perpkt_threads(trace, 1); 324 trace_set_hasher(trace, HASHER_CUSTOM, &custom_hash, &hashercount); 325 326 trace_pstart(trace, &global, processing, reporter); 221 327 iferr(trace,tracename); 222 328 223 /* Make sure traces survive a pause and restart*/329 /* Make sure traces survive a pause */ 224 330 trace_ppause(trace); 225 331 iferr(trace,tracename); … … 230 336 trace_join(trace); 231 337 338 global = 0xffffffff; 339 232 340 /* Now check we have all received all the packets */ 233 if (error == 0) { 234 if (totalpkts == expected) { 235 printf("success: %d packets read\n",expected); 236 } else { 237 printf("failure: %d packets expected, %d seen\n",expected,totalpkts); 238 error = 1; 239 } 240 } else { 341 if (error != 0) { 241 342 iferr(trace,tracename); 242 343 } 243 trace_destroy(trace); 244 return error; 245 } 246 247 248 int main(int argc, char *argv[]) { 249 int error = 0; 250 const char *tracename; 251 expected = 100; 252 253 if (argc<2) { 254 fprintf(stderr,"usage: %s type\n",argv[0]); 255 return 1; 256 } 257 258 tracename = lookup_uri(argv[1]); 259 260 if (strcmp(argv[1],"rtclient")==0) expected=101; 261 262 error = test_hasher_singlethreaded(tracename); 263 return error; 264 } 344 345 trace_destroy(trace); 346 trace_destroy_callback_set(processing); 347 trace_destroy_callback_set(reporter); 348 return error; 349 } -
test/test-format-parallel-singlethreaded.c
r6b98325 r8decff7 91 91 } 92 92 93 94 93 struct TLS { 95 94 bool seen_start_message; … … 100 99 }; 101 100 102 static int totalpkts = 0; 103 static void report_result(libtrace_t *trace UNUSED, int mesg, 104 libtrace_generic_t data, 105 libtrace_thread_t *sender UNUSED) { 106 static int totalthreads = 0; 107 switch (mesg) { 108 case MESSAGE_RESULT: 109 assert(data.res->key == 0); 110 printf("%d,", data.res->value.sint); 111 totalthreads++; 112 totalpkts += data.res->value.sint; 113 break; 114 case MESSAGE_STARTING: 115 // Should have a single thread here 116 assert(libtrace_get_perpkt_count(trace) == 1); 117 printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace)); 118 break; 119 case MESSAGE_STOPPING: 120 printf(")\n"); 121 assert(totalthreads == libtrace_get_perpkt_count(trace)); 122 break; 101 struct final { 102 int threads; 103 int packets; 104 }; 105 106 static void *report_start(libtrace_t *trace UNUSED, 107 libtrace_thread_t *t UNUSED, 108 void *global) { 109 uint32_t *magic = (uint32_t *)global; 110 struct final *threadcounter = 111 (struct final *)malloc(sizeof(struct final)); 112 113 assert(*magic == 0xabcdef); 114 115 threadcounter->threads = 0; 116 threadcounter->packets = 0; 117 return threadcounter; 118 } 119 120 static void report_cb(libtrace_t *trace UNUSED, 121 libtrace_thread_t *sender UNUSED, 122 void *global, void *tls, libtrace_result_t *res) { 123 124 uint32_t *magic = (uint32_t *)global; 125 struct final *threadcounter = (struct final *)tls; 126 127 assert(*magic == 0xabcdef); 128 assert(res->key == 0); 129 130 threadcounter->threads ++; 131 threadcounter->packets += res->value.sint; 132 printf("%d\n", res->value.sint); 133 } 134 135 static void report_end(libtrace_t *trace, libtrace_thread_t *t UNUSED, 136 void *global, void *tls) { 137 138 uint32_t *magic = (uint32_t *)global; 139 struct final *threadcounter = (struct final *)tls; 140 141 assert(*magic == 0xabcdef); 142 assert(threadcounter->threads == trace_get_perpkt_threads(trace)); 143 assert(threadcounter->packets == 100); 144 145 free(threadcounter); 146 } 147 148 static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED, 149 libtrace_thread_t *t UNUSED, 150 void *global, void *tls, libtrace_packet_t *packet) { 151 struct TLS *storage = (struct TLS *)tls; 152 uint32_t *magic = (uint32_t *)global; 153 static __thread int count = 0; 154 int a,*b,c=0; 155 156 assert(storage != NULL); 157 assert(!storage->seen_stop_message); 158 159 if (storage->seen_pausing_message) 160 assert(storage->seen_resuming_message); 161 162 assert(*magic == 0xabcdef); 163 164 storage->count ++; 165 count ++; 166 167 assert(count == storage->count); 168 169 if (count > 100) { 170 fprintf(stderr, "Too many packets -- someone should stop me!\n"); 171 kill(getpid(), SIGTERM); 172 } 173 174 // Do some work to even out the load on cores 175 b = &c; 176 for (a = 0; a < 10000000; a++) { 177 c += a**b; 178 } 179 180 return packet; 181 } 182 183 static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED, 184 void *global) { 185 186 static __thread bool seen_start_message = false; 187 uint32_t *magic = (uint32_t *)global; 188 struct TLS *storage = NULL; 189 assert(*magic == 0xabcdef); 190 191 assert(!seen_start_message); 192 assert(trace); 193 194 storage = (struct TLS *)malloc(sizeof(struct TLS)); 195 storage->seen_start_message = true; 196 storage->seen_stop_message = false; 197 storage->seen_resuming_message = false; 198 storage->seen_pausing_message = false; 199 storage->count = 0; 200 201 seen_start_message = true; 202 203 return storage; 204 } 205 206 static void stop_processing(libtrace_t *trace, libtrace_thread_t *t, 207 void *global, void *tls) { 208 209 static __thread bool seen_stop_message = false; 210 struct TLS *storage = (struct TLS *)tls; 211 uint32_t *magic = (uint32_t *)global; 212 213 assert(storage != NULL); 214 assert(!storage->seen_stop_message); 215 assert(!seen_stop_message); 216 assert(storage->seen_start_message); 217 assert(*magic == 0xabcdef); 218 219 seen_stop_message = true; 220 storage->seen_stop_message = true; 221 222 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = storage->count}, RESULT_USER); 223 trace_post_reporter(trace); 224 free(storage); 225 } 226 227 static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, 228 void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) { 229 230 fprintf(stderr, "Not expecting a tick packet\n"); 231 kill(getpid(), SIGTERM); 232 } 233 234 static void pause_processing(libtrace_t *trace UNUSED, 235 libtrace_thread_t *t UNUSED, 236 void *global, void *tls) { 237 238 static __thread bool seen_pause_message = false; 239 struct TLS *storage = (struct TLS *)tls; 240 uint32_t *magic = (uint32_t *)global; 241 242 assert(storage != NULL); 243 assert(!storage->seen_stop_message); 244 assert(storage->seen_start_message); 245 assert(*magic == 0xabcdef); 246 247 assert(seen_pause_message == storage->seen_pausing_message); 248 249 seen_pause_message = true; 250 storage->seen_pausing_message = true; 251 } 252 253 static void resume_processing(libtrace_t *trace UNUSED, 254 libtrace_thread_t *t UNUSED, 255 void *global, void *tls) { 256 257 static __thread bool seen_resume_message = false; 258 struct TLS *storage = (struct TLS *)tls; 259 uint32_t *magic = (uint32_t *)global; 260 261 assert(storage != NULL); 262 assert(!storage->seen_stop_message); 263 assert(storage->seen_start_message); 264 assert(*magic == 0xabcdef); 265 266 assert(seen_resume_message == storage->seen_resuming_message); 267 268 seen_resume_message = true; 269 storage->seen_resuming_message = true; 270 } 271 272 int main(int argc, char *argv[]) { 273 int error = 0; 274 const char *tracename; 275 libtrace_t *trace; 276 libtrace_callback_set_t *processing = NULL; 277 libtrace_callback_set_t *reporter = NULL; 278 uint32_t global = 0xabcdef; 279 280 if (argc<2) { 281 fprintf(stderr,"usage: %s type\n",argv[0]); 282 return 1; 123 283 } 124 } 125 126 static int x; 127 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t, 128 int mesg, libtrace_generic_t data, 129 libtrace_thread_t *sender UNUSED) { 130 struct TLS *tls; 131 void* ret; 132 int a,*b,c=0; 133 tls = trace_get_tls(t); 134 135 switch (mesg) { 136 case MESSAGE_PACKET: 137 assert(tls != NULL); 138 assert(!(tls->seen_stop_message)); 139 tls->count++; 140 if (tls->count>100) { 141 fprintf(stderr, "Too many packets someone should stop me!!\n"); 142 kill(getpid(), SIGTERM); 143 } 144 // Do some work to even out the load on cores 145 b = &c; 146 for (a = 0; a < 10000000; a++) { 147 c += a**b; 148 } 149 x = c; 150 return data.pkt; 151 case MESSAGE_STARTING: 152 assert(tls == NULL); 153 tls = calloc(sizeof(struct TLS), 1); 154 ret = trace_set_tls(t, tls); 155 assert(ret == NULL); 156 tls->seen_start_message = true; 157 break; 158 case MESSAGE_STOPPING: 159 assert(tls->seen_start_message); 160 assert(tls != NULL); 161 tls->seen_stop_message = true; 162 trace_set_tls(t, NULL); 163 164 // All threads publish to verify the thread count 165 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_USER); 166 trace_post_reporter(trace); 167 free(tls); 168 break; 169 case MESSAGE_TICK_INTERVAL: 170 case MESSAGE_TICK_COUNT: 171 assert(tls->seen_start_message ); 172 fprintf(stderr, "Not expecting a tick packet\n"); 173 kill(getpid(), SIGTERM); 174 break; 175 case MESSAGE_PAUSING: 176 assert(tls->seen_start_message); 177 tls->seen_pausing_message = true; 178 break; 179 case MESSAGE_RESUMING: 180 assert(tls->seen_pausing_message || tls->seen_start_message); 181 tls->seen_resuming_message = true; 182 break; 183 } 184 return NULL; 185 } 186 187 188 /** 189 * Test that the single threaded fallback works 190 */ 191 int test_single_threaded(const char *tracename, int expected) { 192 libtrace_t *trace; 193 int error = 0; 194 printf("Testing single threaded\n"); 195 196 // Create the trace 284 285 tracename = lookup_uri(argv[1]); 286 197 287 trace = trace_create(tracename); 198 288 iferr(trace,tracename); 199 289 200 // Enable the single threaded fallback codepath 201 trace_set_perpkt_threads(trace, 1); 202 203 // Start it 204 trace_pstart(trace, NULL, per_packet, report_result); 290 processing = trace_create_callback_set(); 291 trace_set_starting_cb(processing, start_processing); 292 trace_set_stopping_cb(processing, stop_processing); 293 trace_set_packet_cb(processing, per_packet); 294 trace_set_pausing_cb(processing, pause_processing); 295 trace_set_resuming_cb(processing, resume_processing); 296 trace_set_tick_count_cb(processing, process_tick); 297 trace_set_tick_interval_cb(processing, process_tick); 298 299 reporter = trace_create_callback_set(); 300 trace_set_starting_cb(reporter, report_start); 301 trace_set_stopping_cb(reporter, report_end); 302 trace_set_result_cb(reporter, report_cb); 303 304 /* Limit this to just one thread */ 305 trace_set_perpkt_threads(trace, 1); 306 307 trace_pstart(trace, &global, processing, reporter); 205 308 iferr(trace,tracename); 206 309 207 /* Make sure traces survive a pause and restart*/310 /* Make sure traces survive a pause */ 208 311 trace_ppause(trace); 209 312 iferr(trace,tracename); … … 214 317 trace_join(trace); 215 318 319 global = 0xffffffff; 320 216 321 /* Now check we have all received all the packets */ 217 if (error == 0) { 218 if (totalpkts == expected) { 219 printf("success: %d packets read\n",expected); 220 } else { 221 printf("failure: %d packets expected, %d seen\n",expected,totalpkts); 222 error = 1; 223 } 224 } else { 322 if (error != 0) { 225 323 iferr(trace,tracename); 226 324 } 227 trace_destroy(trace); 228 return error; 229 } 230 231 int main(int argc, char *argv[]) { 232 int error = 0; 233 int expected = 100; 234 const char *tracename; 235 236 if (argc<2) { 237 fprintf(stderr,"usage: %s type\n",argv[0]); 238 return 1; 239 } 240 241 tracename = lookup_uri(argv[1]); 242 243 if (strcmp(argv[1],"rtclient")==0) expected=101; 244 245 error = test_single_threaded(tracename, expected); 246 return error; 247 } 325 326 trace_destroy(trace); 327 trace_destroy_callback_set(processing); 328 trace_destroy_callback_set(reporter); 329 return error; 330 } -
test/test-format-parallel-stressthreads.c
r6b98325 r8decff7 91 91 } 92 92 93 94 93 struct TLS { 95 94 bool seen_start_message; … … 100 99 }; 101 100 102 static int totalpkts = 0; 103 static void report_result(libtrace_t *trace UNUSED, int mesg, 104 libtrace_generic_t data, 105 libtrace_thread_t *sender UNUSED) { 106 static int totalthreads = 0; 107 switch (mesg) { 108 case MESSAGE_RESULT: 109 assert(data.res->key == 0); 110 printf("%d,", data.res->value.sint); 111 totalthreads++; 112 totalpkts += data.res->value.sint; 113 break; 114 case MESSAGE_STARTING: 115 // Should have a single thread here 116 assert(libtrace_get_perpkt_count(trace) == 100); 117 printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace)); 118 break; 119 case MESSAGE_STOPPING: 120 printf(")\n"); 121 assert(totalthreads == libtrace_get_perpkt_count(trace)); 122 break; 101 struct final { 102 int threads; 103 int packets; 104 }; 105 106 static void *report_start(libtrace_t *trace UNUSED, 107 libtrace_thread_t *t UNUSED, 108 void *global) { 109 uint32_t *magic = (uint32_t *)global; 110 struct final *threadcounter = 111 (struct final *)malloc(sizeof(struct final)); 112 113 assert(*magic == 0xabcdef); 114 115 threadcounter->threads = 0; 116 threadcounter->packets = 0; 117 return threadcounter; 118 } 119 120 static void report_cb(libtrace_t *trace UNUSED, 121 libtrace_thread_t *sender UNUSED, 122 void *global, void *tls, libtrace_result_t *res) { 123 124 uint32_t *magic = (uint32_t *)global; 125 struct final *threadcounter = (struct final *)tls; 126 127 assert(*magic == 0xabcdef); 128 assert(res->key == 0); 129 130 threadcounter->threads ++; 131 threadcounter->packets += res->value.sint; 132 printf("%d\n", res->value.sint); 133 } 134 135 static void report_end(libtrace_t *trace, libtrace_thread_t *t UNUSED, 136 void *global, void *tls) { 137 138 uint32_t *magic = (uint32_t *)global; 139 struct final *threadcounter = (struct final *)tls; 140 141 assert(*magic == 0xabcdef); 142 assert(threadcounter->threads == trace_get_perpkt_threads(trace)); 143 assert(threadcounter->packets == 100); 144 145 free(threadcounter); 146 } 147 148 static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED, 149 libtrace_thread_t *t UNUSED, 150 void *global, void *tls, libtrace_packet_t *packet) { 151 struct TLS *storage = (struct TLS *)tls; 152 uint32_t *magic = (uint32_t *)global; 153 static __thread int count = 0; 154 int a,*b,c=0; 155 156 assert(storage != NULL); 157 assert(!storage->seen_stop_message); 158 159 if (storage->seen_pausing_message) 160 assert(storage->seen_resuming_message); 161 162 assert(*magic == 0xabcdef); 163 164 storage->count ++; 165 count ++; 166 167 assert(count == storage->count); 168 169 if (count > 100) { 170 fprintf(stderr, "Too many packets -- someone should stop me!\n"); 171 kill(getpid(), SIGTERM); 172 } 173 174 // Do some work to even out the load on cores 175 b = &c; 176 for (a = 0; a < 10000000; a++) { 177 c += a**b; 178 } 179 180 return packet; 181 } 182 183 static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED, 184 void *global) { 185 186 static __thread bool seen_start_message = false; 187 uint32_t *magic = (uint32_t *)global; 188 struct TLS *storage = NULL; 189 assert(*magic == 0xabcdef); 190 191 assert(!seen_start_message); 192 assert(trace); 193 194 storage = (struct TLS *)malloc(sizeof(struct TLS)); 195 storage->seen_start_message = true; 196 storage->seen_stop_message = false; 197 storage->seen_resuming_message = false; 198 storage->seen_pausing_message = false; 199 storage->count = 0; 200 201 seen_start_message = true; 202 203 return storage; 204 } 205 206 static void stop_processing(libtrace_t *trace, libtrace_thread_t *t, 207 void *global, void *tls) { 208 209 static __thread bool seen_stop_message = false; 210 struct TLS *storage = (struct TLS *)tls; 211 uint32_t *magic = (uint32_t *)global; 212 213 assert(storage != NULL); 214 assert(!storage->seen_stop_message); 215 assert(!seen_stop_message); 216 assert(storage->seen_start_message); 217 assert(*magic == 0xabcdef); 218 219 seen_stop_message = true; 220 storage->seen_stop_message = true; 221 222 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = storage->count}, RESULT_USER); 223 trace_post_reporter(trace); 224 free(storage); 225 } 226 227 static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, 228 void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) { 229 230 fprintf(stderr, "Not expecting a tick packet\n"); 231 kill(getpid(), SIGTERM); 232 } 233 234 static void pause_processing(libtrace_t *trace UNUSED, 235 libtrace_thread_t *t UNUSED, 236 void *global, void *tls) { 237 238 static __thread bool seen_pause_message = false; 239 struct TLS *storage = (struct TLS *)tls; 240 uint32_t *magic = (uint32_t *)global; 241 242 assert(storage != NULL); 243 assert(!storage->seen_stop_message); 244 assert(storage->seen_start_message); 245 assert(*magic == 0xabcdef); 246 247 assert(seen_pause_message == storage->seen_pausing_message); 248 249 seen_pause_message = true; 250 storage->seen_pausing_message = true; 251 } 252 253 static void resume_processing(libtrace_t *trace UNUSED, 254 libtrace_thread_t *t UNUSED, 255 void *global, void *tls) { 256 257 static __thread bool seen_resume_message = false; 258 struct TLS *storage = (struct TLS *)tls; 259 uint32_t *magic = (uint32_t *)global; 260 261 assert(storage != NULL); 262 assert(!storage->seen_stop_message); 263 assert(storage->seen_start_message); 264 assert(*magic == 0xabcdef); 265 266 assert(seen_resume_message == storage->seen_resuming_message); 267 268 seen_resume_message = true; 269 storage->seen_resuming_message = true; 270 } 271 272 int main(int argc, char *argv[]) { 273 int error = 0; 274 const char *tracename; 275 libtrace_t *trace; 276 libtrace_callback_set_t *processing = NULL; 277 libtrace_callback_set_t *reporter = NULL; 278 uint32_t global = 0xabcdef; 279 280 if (argc<2) { 281 fprintf(stderr,"usage: %s type\n",argv[0]); 282 return 1; 123 283 } 124 } 125 126 static int x; 127 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t, 128 int mesg, libtrace_generic_t data, 129 libtrace_thread_t *sender UNUSED) { 130 struct TLS *tls; 131 void* ret; 132 int a,*b,c=0; 133 tls = trace_get_tls(t); 134 135 switch (mesg) { 136 case MESSAGE_PACKET: 137 assert(tls != NULL); 138 assert(!(tls->seen_stop_message)); 139 tls->count++; 140 if (tls->count>100) { 141 fprintf(stderr, "Too many packets someone should stop me!!\n"); 142 kill(getpid(), SIGTERM); 143 } 144 // Do some work to even out the load on cores 145 b = &c; 146 for (a = 0; a < 10000000; a++) { 147 c += a**b; 148 } 149 x = c; 150 return data.pkt; 151 case MESSAGE_STARTING: 152 assert(tls == NULL); 153 tls = calloc(sizeof(struct TLS), 1); 154 ret = trace_set_tls(t, tls); 155 assert(ret == NULL); 156 tls->seen_start_message = true; 157 break; 158 case MESSAGE_STOPPING: 159 assert(tls->seen_start_message); 160 assert(tls != NULL); 161 tls->seen_stop_message = true; 162 trace_set_tls(t, NULL); 163 164 // All threads publish to verify the thread count 165 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint=tls->count}, RESULT_USER); 166 trace_post_reporter(trace); 167 free(tls); 168 break; 169 case MESSAGE_TICK_INTERVAL: 170 case MESSAGE_TICK_COUNT: 171 assert(tls->seen_start_message ); 172 fprintf(stderr, "Not expecting a tick packet\n"); 173 kill(getpid(), SIGTERM); 174 break; 175 case MESSAGE_PAUSING: 176 assert(tls->seen_start_message); 177 tls->seen_pausing_message = true; 178 break; 179 case MESSAGE_RESUMING: 180 assert(tls->seen_pausing_message || tls->seen_start_message); 181 tls->seen_resuming_message = true; 182 break; 183 } 184 return NULL; 185 } 186 187 188 /** 189 * Test that the single threaded fallback works 190 */ 191 int test_100_threads(const char *tracename, int expected) { 192 libtrace_t *trace; 193 int error = 0; 194 printf("Testing single threaded\n"); 195 196 // Create the trace 284 285 tracename = lookup_uri(argv[1]); 286 197 287 trace = trace_create(tracename); 198 288 iferr(trace,tracename); 199 289 200 // Enable the single threaded fallback codepath 201 trace_set_perpkt_threads(trace, 100); 202 203 // Start it 204 trace_pstart(trace, NULL, per_packet, report_result); 290 processing = trace_create_callback_set(); 291 trace_set_starting_cb(processing, start_processing); 292 trace_set_stopping_cb(processing, stop_processing); 293 trace_set_packet_cb(processing, per_packet); 294 trace_set_pausing_cb(processing, pause_processing); 295 trace_set_resuming_cb(processing, resume_processing); 296 trace_set_tick_count_cb(processing, process_tick); 297 trace_set_tick_interval_cb(processing, process_tick); 298 299 reporter = trace_create_callback_set(); 300 trace_set_starting_cb(reporter, report_start); 301 trace_set_stopping_cb(reporter, report_end); 302 trace_set_result_cb(reporter, report_cb); 303 304 305 trace_set_perpkt_threads(trace, 100); 306 307 trace_pstart(trace, &global, processing, reporter); 205 308 iferr(trace,tracename); 206 309 207 /* Make sure traces survive a pause and restart*/310 /* Make sure traces survive a pause */ 208 311 trace_ppause(trace); 209 312 iferr(trace,tracename); … … 214 317 trace_join(trace); 215 318 319 global = 0xffffffff; 320 216 321 /* Now check we have all received all the packets */ 217 if (error == 0) { 218 if (totalpkts == expected) { 219 printf("success: %d packets read\n",expected); 220 } else { 221 printf("failure: %d packets expected, %d seen\n",expected,totalpkts); 222 error = 1; 223 } 224 } else { 322 if (error != 0) { 225 323 iferr(trace,tracename); 226 324 } 227 trace_destroy(trace); 228 return error; 229 } 230 231 int main(int argc, char *argv[]) { 232 int error = 0; 233 int expected = 100; 234 const char *tracename; 235 236 if (argc<2) { 237 fprintf(stderr,"usage: %s type\n",argv[0]); 238 return 1; 239 } 240 241 tracename = lookup_uri(argv[1]); 242 243 if (strcmp(argv[1],"rtclient")==0) expected=101; 244 245 error = test_100_threads(tracename, expected); 246 return error; 247 } 325 326 trace_destroy(trace); 327 trace_destroy_callback_set(processing); 328 trace_destroy_callback_set(reporter); 329 return error; 330 } -
test/test-format-parallel.c
r6a6e6a8 r8decff7 99 99 }; 100 100 101 static int totalpkts = 0; 102 static void report_result(libtrace_t *trace UNUSED, int mesg, 103 libtrace_generic_t data, 104 libtrace_thread_t *sender UNUSED) { 105 static int totalthreads = 0; 106 switch (mesg) { 107 case MESSAGE_RESULT: 108 assert(data.res->key == 0); 109 printf("%d,", data.res->value.sint); 110 totalthreads++; 111 totalpkts += data.res->value.sint; 112 break; 113 case MESSAGE_STARTING: 114 printf("\tLooks like %d threads are being used!\n\tcounts(", libtrace_get_perpkt_count(trace)); 115 break; 116 case MESSAGE_STOPPING: 117 printf(")\n"); 118 assert(totalthreads == libtrace_get_perpkt_count(trace)); 119 break; 120 } 121 } 122 123 int x; 124 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t, 125 int mesg, libtrace_generic_t data, 126 libtrace_thread_t *sender UNUSED) { 127 struct TLS *tls; 128 void* ret; 101 struct final { 102 int threads; 103 int packets; 104 }; 105 106 static void *report_start(libtrace_t *trace UNUSED, 107 libtrace_thread_t *t UNUSED, 108 void *global) { 109 uint32_t *magic = (uint32_t *)global; 110 struct final *threadcounter = 111 (struct final *)malloc(sizeof(struct final)); 112 113 assert(*magic == 0xabcdef); 114 115 threadcounter->threads = 0; 116 threadcounter->packets = 0; 117 return threadcounter; 118 } 119 120 static void report_cb(libtrace_t *trace UNUSED, 121 libtrace_thread_t *sender UNUSED, 122 void *global, void *tls, libtrace_result_t *res) { 123 124 uint32_t *magic = (uint32_t *)global; 125 struct final *threadcounter = (struct final *)tls; 126 127 assert(*magic == 0xabcdef); 128 assert(res->key == 0); 129 130 threadcounter->threads ++; 131 threadcounter->packets += res->value.sint; 132 printf("%d\n", res->value.sint); 133 } 134 135 static void report_end(libtrace_t *trace, libtrace_thread_t *t UNUSED, 136 void *global, void *tls) { 137 138 uint32_t *magic = (uint32_t *)global; 139 struct final *threadcounter = (struct final *)tls; 140 141 assert(*magic == 0xabcdef); 142 assert(threadcounter->threads == trace_get_perpkt_threads(trace)); 143 assert(threadcounter->packets == 100); 144 145 free(threadcounter); 146 } 147 148 static libtrace_packet_t *per_packet(libtrace_t *trace UNUSED, 149 libtrace_thread_t *t UNUSED, 150 void *global, void *tls, libtrace_packet_t *packet) { 151 struct TLS *storage = (struct TLS *)tls; 152 uint32_t *magic = (uint32_t *)global; 153 static __thread int count = 0; 129 154 int a,*b,c=0; 130 // Test internal TLS against __thread 131 static __thread bool seen_start_message = false; 132 static __thread bool seen_stop_message = false; 133 static __thread bool seen_resuming_message = false; 134 static __thread bool seen_pausing_message = false; 135 static __thread int count = 0; 136 tls = trace_get_tls(t); 137 138 switch (mesg) { 139 case MESSAGE_PACKET: 140 assert(tls != NULL); 141 assert(!seen_stop_message); 142 count++; 143 tls->count++; 144 if (count>100) { 145 fprintf(stderr, "Too many packets someone should stop me!!\n"); 146 kill(getpid(), SIGTERM); 147 } 148 // Do some work to even out the load on cores 149 b = &c; 150 for (a = 0; a < 10000000; a++) { 151 c += a**b; 152 } 153 x = c; 154 return data.pkt; 155 case MESSAGE_STARTING: 156 assert(!seen_start_message || seen_resuming_message); 157 assert(tls == NULL); 158 tls = calloc(sizeof(struct TLS), 1); 159 ret = trace_set_tls(t, tls); 160 assert(ret == NULL); 161 seen_start_message = true; 162 tls->seen_start_message = true; 163 break; 164 case MESSAGE_STOPPING: 165 assert(seen_start_message); 166 assert(tls != NULL); 167 assert(tls->seen_start_message); 168 assert(tls->count == count); 169 seen_stop_message = true; 170 tls->seen_stop_message = true; 171 free(tls); 172 trace_set_tls(t, NULL); 173 174 // All threads publish to verify the thread count 175 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = count}, RESULT_USER); 176 trace_post_reporter(trace); 177 break; 178 case MESSAGE_TICK_INTERVAL: 179 case MESSAGE_TICK_COUNT: 180 assert(seen_start_message); 181 fprintf(stderr, "Not expecting a tick packet\n"); 182 kill(getpid(), SIGTERM); 183 break; 184 case MESSAGE_PAUSING: 185 assert(seen_start_message); 186 seen_pausing_message = true; 187 tls->seen_pausing_message = true; 188 break; 189 case MESSAGE_RESUMING: 190 assert(tls->seen_pausing_message || tls->seen_start_message ); 191 seen_resuming_message = true; 192 tls->seen_resuming_message = true; 193 break; 194 } 195 return NULL; 155 156 assert(storage != NULL); 157 assert(!storage->seen_stop_message); 158 159 if (storage->seen_pausing_message) 160 assert(storage->seen_resuming_message); 161 162 assert(*magic == 0xabcdef); 163 164 storage->count ++; 165 count ++; 166 167 assert(count == storage->count); 168 169 if (count > 100) { 170 fprintf(stderr, "Too many packets -- someone should stop me!\n"); 171 kill(getpid(), SIGTERM); 172 } 173 174 // Do some work to even out the load on cores 175 b = &c; 176 for (a = 0; a < 10000000; a++) { 177 c += a**b; 178 } 179 180 return packet; 181 } 182 183 static void *start_processing(libtrace_t *trace, libtrace_thread_t *t UNUSED, 184 void *global) { 185 186 static __thread bool seen_start_message = false; 187 uint32_t *magic = (uint32_t *)global; 188 struct TLS *storage = NULL; 189 assert(*magic == 0xabcdef); 190 191 assert(!seen_start_message); 192 assert(trace); 193 194 storage = (struct TLS *)malloc(sizeof(struct TLS)); 195 storage->seen_start_message = true; 196 storage->seen_stop_message = false; 197 storage->seen_resuming_message = false; 198 storage->seen_pausing_message = false; 199 storage->count = 0; 200 201 seen_start_message = true; 202 203 return storage; 204 } 205 206 static void stop_processing(libtrace_t *trace, libtrace_thread_t *t, 207 void *global, void *tls) { 208 209 static __thread bool seen_stop_message = false; 210 struct TLS *storage = (struct TLS *)tls; 211 uint32_t *magic = (uint32_t *)global; 212 213 assert(storage != NULL); 214 assert(!storage->seen_stop_message); 215 assert(!seen_stop_message); 216 assert(storage->seen_start_message); 217 assert(*magic == 0xabcdef); 218 219 seen_stop_message = true; 220 storage->seen_stop_message = true; 221 222 trace_publish_result(trace, t, (uint64_t) 0, (libtrace_generic_t){.sint = storage->count}, RESULT_USER); 223 trace_post_reporter(trace); 224 free(storage); 225 } 226 227 static void process_tick(libtrace_t *trace UNUSED, libtrace_thread_t *t UNUSED, 228 void *global UNUSED, void *tls UNUSED, uint64_t tick UNUSED) { 229 230 fprintf(stderr, "Not expecting a tick packet\n"); 231 kill(getpid(), SIGTERM); 232 } 233 234 static void pause_processing(libtrace_t *trace UNUSED, 235 libtrace_thread_t *t UNUSED, 236 void *global, void *tls) { 237 238 static __thread bool seen_pause_message = false; 239 struct TLS *storage = (struct TLS *)tls; 240 uint32_t *magic = (uint32_t *)global; 241 242 assert(storage != NULL); 243 assert(!storage->seen_stop_message); 244 assert(storage->seen_start_message); 245 assert(*magic == 0xabcdef); 246 247 assert(seen_pause_message == storage->seen_pausing_message); 248 249 seen_pause_message = true; 250 storage->seen_pausing_message = true; 251 } 252 253 static void resume_processing(libtrace_t *trace UNUSED, 254 libtrace_thread_t *t UNUSED, 255 void *global, void *tls) { 256 257 static __thread bool seen_resume_message = false; 258 struct TLS *storage = (struct TLS *)tls; 259 uint32_t *magic = (uint32_t *)global; 260 261 assert(storage != NULL); 262 assert(!storage->seen_stop_message); 263 assert(storage->seen_start_message); 264 assert(*magic == 0xabcdef); 265 266 assert(seen_resume_message == storage->seen_resuming_message); 267 268 seen_resume_message = true; 269 storage->seen_resuming_message = true; 196 270 } 197 271 198 272 int main(int argc, char *argv[]) { 199 273 int error = 0; 200 int expected = 100;201 274 const char *tracename; 202 275 libtrace_t *trace; 276 libtrace_callback_set_t *processing = NULL; 277 libtrace_callback_set_t *reporter = NULL; 278 uint32_t global = 0xabcdef; 203 279 204 280 if (argc<2) { … … 212 288 iferr(trace,tracename); 213 289 214 if (strcmp(argv[1],"rtclient")==0) expected=101; 215 216 trace_pstart(trace, NULL, per_packet, report_result); 290 processing = trace_create_callback_set(); 291 trace_set_starting_cb(processing, start_processing); 292 trace_set_stopping_cb(processing, stop_processing); 293 trace_set_packet_cb(processing, per_packet); 294 trace_set_pausing_cb(processing, pause_processing); 295 trace_set_resuming_cb(processing, resume_processing); 296 trace_set_tick_count_cb(processing, process_tick); 297 trace_set_tick_interval_cb(processing, process_tick); 298 299 reporter = trace_create_callback_set(); 300 trace_set_starting_cb(reporter, report_start); 301 trace_set_stopping_cb(reporter, report_end); 302 trace_set_result_cb(reporter, report_cb); 303 304 305 trace_set_perpkt_threads(trace, 4); 306 307 trace_pstart(trace, &global, processing, reporter); 217 308 iferr(trace,tracename); 218 309 … … 226 317 trace_join(trace); 227 318 319 global = 0xffffffff; 320 228 321 /* Now check we have all received all the packets */ 229 if (error == 0) { 230 if (totalpkts == expected) { 231 printf("success: %d packets read\n",expected); 232 } else { 233 printf("failure: %d packets expected, %d seen\n",expected,totalpkts); 234 error = 1; 235 } 236 } else { 322 if (error != 0) { 237 323 iferr(trace,tracename); 238 324 } 239 325 240 trace_destroy(trace); 241 return error; 242 } 326 trace_destroy(trace); 327 trace_destroy_callback_set(processing); 328 trace_destroy_callback_set(reporter); 329 return error; 330 } -
test/test-live.c
r116f970 r8decff7 150 150 { 151 151 int err = 0; 152 libtrace_stat_t *stat; 153 154 stat = trace_create_statistics(); 155 156 trace_get_statistics(trace_read, stat); 152 157 // Assume no loss here, if not the case we would of hung in reading loop 153 158 // anyway 154 if (trace_get_dropped_packets(trace_read) != 0) { 155 if (trace_get_dropped_packets(trace_read) == UINT64_MAX) { 156 printf("\tInfo: trace does not support drop counter\n"); 157 } else { 158 ERROR("Trace dropped %zu packets\n", 159 trace_get_dropped_packets(trace_read)); 160 } 161 } 162 if (trace_get_filtered_packets(trace_read) != 0) { 163 if (trace_get_filtered_packets(trace_read) == UINT64_MAX) { 164 printf("\tInfo: trace does not support filter counter\n"); 165 } else { 166 ERROR("Trace dropped %zu packets\n", 167 trace_get_filtered_packets(trace_read)); 168 } 169 } 170 if (trace_get_received_packets(trace_read) != 100) { 171 if (trace_get_received_packets(trace_read) == UINT64_MAX) { 172 printf("\tInfo: trace does not support received counter\n"); 173 } else { 174 ERROR("Trace received %zu packets\n", 175 trace_get_received_packets(trace_read)); 176 } 177 } 178 if (trace_get_accepted_packets(trace_read) != (size_t) test_size) { 179 // This would more likely be a libtrace issue rather than format specific 180 ERROR("Trace only accepted %zu packets\n", 181 trace_get_accepted_packets(trace_read)); 182 } 159 if (!stat->dropped_valid) { 160 printf("\tInfo: trace does not support drop counter\n"); 161 } else if (stat->dropped != 0) { 162 ERROR("Trace dropped %zu packets\n", stat->dropped); 163 } 164 165 if (!stat->filtered_valid) { 166 printf("\tInfo: trace does not support filter counter\n"); 167 } else if (stat->filtered != 0) { 168 ERROR("Trace filtered %zu packets\n", stat->filtered); 169 } 170 171 if (!stat->received_valid) { 172 printf("\tInfo: trace does not support received counter\n"); 173 } else if (stat->received != 100) { 174 ERROR("Trace received %zu/100 packets\n", stat->received); 175 } 176 177 if (!stat->accepted_valid) { 178 printf("\tInfo: trace does not support accepted counter\n"); 179 } else if (stat->accepted != (uint32_t) test_size) { 180 ERROR("Trace only accepted %zu/%u packets\n", stat->accepted, 181 (uint32_t)test_size); 182 } 183 183 184 return err; 184 185 } -
test/test-tracetime-parallel.c
r6b98325 r8decff7 62 62 63 63 64 static int totalpkts = 0;65 static int skippedpkts = 0;66 static int expected;67 68 64 libtrace_t *trace = NULL; 65 int total = 0; 66 69 67 static void signal_handler(int signal) 70 68 { … … 73 71 74 72 /* check within 10 seconds we got 9-11 packets */ 75 assert(check_range_jitter(10.0, (double) total pkts, 1.0));73 assert(check_range_jitter(10.0, (double) total, 1.0)); 76 74 77 75 /* Now fullspeed it */ … … 83 81 } 84 82 85 static void report_result(libtrace_t *trace UNUSED, int mesg, 86 libtrace_generic_t data, 87 libtrace_thread_t *sender UNUSED) { 88 89 switch (mesg) { 90 case MESSAGE_STARTING: 91 break; 92 case MESSAGE_RESULT: 93 switch (data.res->type) { 94 case RESULT_USER: 95 totalpkts++; 96 break; 97 case RESULT_USER+1: 98 skippedpkts++; 99 break; 100 } 101 break; 102 } 103 } 104 105 static void* per_packet(libtrace_t *trace, libtrace_thread_t *t, 106 int mesg, libtrace_generic_t data, 107 libtrace_thread_t *sender UNUSED) { 83 struct counter { 84 int total; 85 int skipped; 86 }; 87 88 static void *start_report(libtrace_t *trace UNUSED, 89 libtrace_thread_t *t UNUSED, void *global UNUSED) { 90 91 struct counter *c = (struct counter *)malloc(sizeof(struct counter)); 92 c->total = 0; 93 c->skipped = 0; 94 return c; 95 96 } 97 98 static void stop_report(libtrace_t *trace UNUSED, 99 libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls) { 100 101 struct counter *c = (struct counter *)tls; 102 103 assert(c->skipped <= 20); 104 assert(c->skipped + c->total == 100); 105 106 free(c); 107 } 108 109 static void report_cb(libtrace_t *trace UNUSED, 110 libtrace_thread_t *sender UNUSED, void *global UNUSED, 111 void *tls, libtrace_result_t *result) { 112 113 struct counter *c = (struct counter *)tls; 114 if (result->type == RESULT_USER) 115 c->total ++; 116 if (result->type == RESULT_USER + 1) 117 c->skipped ++; 118 119 total = c->total; 120 121 } 122 123 static void *start_process(libtrace_t *trace UNUSED, 124 libtrace_thread_t *t UNUSED, void *global UNUSED) { 125 126 bool *accepting = (bool *)malloc(sizeof(bool)); 127 *accepting = true; 128 return accepting; 129 130 } 131 132 static void stop_process(libtrace_t *trace UNUSED, 133 libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls) { 134 bool *accepting = (bool *)tls; 135 free(accepting); 136 } 137 138 static void pause_process(libtrace_t *trace UNUSED, 139 libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls) { 140 bool *accepting = (bool *)tls; 141 *accepting = false; 142 } 143 144 static void resume_process(libtrace_t *trace UNUSED, 145 libtrace_thread_t *t UNUSED, void *global UNUSED, void *tls) { 146 bool *accepting = (bool *)tls; 147 *accepting = true; 148 } 149 150 static void user_message(libtrace_t *trace UNUSED, 151 libtrace_thread_t *t UNUSED, void *global UNUSED, 152 void *tls UNUSED, int msg UNUSED, libtrace_generic_t ts) { 153 154 struct timeval tv; 155 double time; 156 157 gettimeofday(&tv, NULL); 158 time = timeval_to_seconds(tv); 159 160 assert(check_range_jitter(ts.rdouble, time, 0.01)); 161 } 162 163 static libtrace_packet_t *per_packet(libtrace_t *trace, libtrace_thread_t *t, 164 void *global UNUSED, void *tls, libtrace_packet_t *packet) { 165 108 166 struct timeval tv; 109 167 double time; 110 168 libtrace_message_t message; 111 static __thread bool accepting = true;169 bool *accepting = (bool *)tls; 112 170 113 171 gettimeofday(&tv, NULL); 114 172 time = timeval_to_seconds(tv); 115 173 116 switch (mesg) { 117 case MESSAGE_PACKET: 118 /* In order to instantly pause a trace we don't delay any buffered packets 119 * These are sent after MESSAGE_PAUSING has been received */ 120 if (accepting) { 121 fprintf(stderr, "."); 122 trace_publish_result(trace, t, (uint64_t) time, (libtrace_generic_t){.rdouble = time}, RESULT_USER); 123 124 /* Check that we are not blocking regular message delivery */ 125 message.code = MESSAGE_USER; 126 message.sender = t; 127 message.data.rdouble = time; 128 trace_message_perpkts(trace, &message); 129 } else { 130 trace_publish_result(trace, t, (uint64_t) time, (libtrace_generic_t){.rdouble = time}, RESULT_USER+1); 131 } 132 return data.pkt; 133 case MESSAGE_USER: 134 assert (check_range_jitter(data.rdouble, time, 0.01)); 135 break; 136 case MESSAGE_RESUMING: 137 accepting = true; 138 break; 139 case MESSAGE_PAUSING: 140 accepting = false; 141 break; 142 } 143 return NULL; 144 } 145 174 if (*accepting) { 175 fprintf(stderr, "."); 176 trace_publish_result(trace, t, (uint64_t)time, 177 (libtrace_generic_t){.rdouble = time}, 178 RESULT_USER); 179 /* Test that we are not interfering with message delivery */ 180 message.code = MESSAGE_USER; 181 message.sender = t; 182 message.data.rdouble = time; 183 trace_message_perpkts(trace, &message); 184 } else { 185 trace_publish_result(trace, t, (uint64_t)time, 186 (libtrace_generic_t){.rdouble = time}, 187 RESULT_USER+1); 188 189 } 190 return packet; 191 192 } 146 193 /** 147 194 * Test that tracetime playback functions. … … 156 203 struct timeval tv; 157 204 double start, end; 158 gettimeofday(&tv, NULL); 205 libtrace_callback_set_t *processing; 206 libtrace_callback_set_t *reporter; 207 208 gettimeofday(&tv, NULL); 159 209 start = timeval_to_seconds(tv); 160 210 printf("Testing delay\n"); … … 168 218 trace_set_tracetime(trace, true); 169 219 220 processing = trace_create_callback_set(); 221 trace_set_starting_cb(processing, start_process); 222 trace_set_stopping_cb(processing, stop_process); 223 trace_set_pausing_cb(processing, pause_process); 224 trace_set_resuming_cb(processing, resume_process); 225 trace_set_packet_cb(processing, per_packet); 226 trace_set_user_message_cb(processing, user_message); 227 228 reporter = trace_create_callback_set(); 229 trace_set_starting_cb(reporter, start_report); 230 trace_set_stopping_cb(reporter, stop_report); 231 trace_set_result_cb(reporter, report_cb); 232 233 trace_set_reporter_thold(trace, 1); 234 170 235 // Start it 171 trace_pstart(trace, NULL, p er_packet, report_result);236 trace_pstart(trace, NULL, processing, reporter); 172 237 iferr(trace,tracename); 173 238 fprintf(stderr, "Running in tracetime (Will take about 10 seconds)\t"); … … 179 244 trace_join(trace); 180 245 181 /* Now check we have all received all the packets */ 182 assert(skippedpkts <= 20); /* Note this is hard coded to the default burst_sizeX2 */ 183 if (error == 0) { 184 if (totalpkts + skippedpkts == expected) { 185 printf("success: %d packets read\n",expected); 186 } else { 187 printf("failure: %d packets expected, %d seen\n",expected,totalpkts); 188 error = 1; 189 } 190 } else { 246 if (error != 0) { 191 247 iferr(trace,tracename); 192 248 } … … 202 258 int error = 0; 203 259 const char *tracename; 204 expected = 100;205 260 206 261 signal(SIGALRM, signal_handler); … … 209 264 210 265 error = test_tracetime(tracename); 211 266 fprintf(stderr, "\n"); 212 267 return error; 213 268 }
Note: See TracChangeset
for help on using the changeset viewer.