- Timestamp:
- 08/21/15 11:22:30 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- eea427f
- Parents:
- 76291d1
- Location:
- lib
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/data-struct/object_cache.c
r14c6c08 r4007dbb 27 27 }; 28 28 29 #ifdef ENABLE_MEM_STATS 29 30 extern __thread struct mem_stats mem_hits; 31 #endif 30 32 31 33 struct local_caches { … … 123 125 /* Get TLS for the list of local_caches */ 124 126 static inline struct local_caches *get_local_caches() { 127 #if HAVE_TLS 125 128 static __thread struct local_caches *lcs = NULL; 126 129 if (lcs) { 127 130 return lcs; 128 } else { 131 } 132 #else 133 struct local_caches *lcs; 134 pthread_once(&memory_destructor_once, &once_memory_cache_key_init); 135 if ((lcs=pthread_getspecific(memory_destructor_key)) != 0) { 136 return lcs; 137 } 138 #endif 139 else { 129 140 /* This thread has not been used with a memory pool before */ 130 141 /* Allocate our TLS */ … … 280 291 memcpy(values, &lc->cache[lc->used - nb_buffers], sizeof(void *) * nb_buffers); 281 292 lc->used -= nb_buffers; 293 #ifdef ENABLE_MEM_STATS 282 294 mem_hits.read.cache_hit += nb_buffers; 283 295 mem_hits.readbulk.cache_hit += 1; 296 #endif 284 297 return nb_buffers; 285 298 } … … 287 300 else if (nb_buffers > lc->total) { 288 301 i = libtrace_ringbuffer_sread_bulk(rb, values, nb_buffers, min_nb_buffers); 302 #ifdef ENABLE_MEM_STATS 289 303 if (i) 290 304 mem_hits.readbulk.ring_hit += 1; … … 292 306 mem_hits.readbulk.miss += 1; 293 307 mem_hits.read.ring_hit += i; 308 #endif 294 309 } else { // Not enough cached 295 310 // Empty the cache and re-fill it and then see what we're left with 296 311 i = lc->used; 297 312 memcpy(values, lc->cache, sizeof(void *) * lc->used); 313 #ifdef ENABLE_MEM_STATS 298 314 mem_hits.read.cache_hit += i; 315 #endif 299 316 300 317 // Make sure we still meet the minimum requirement … … 303 320 else 304 321 lc->used = libtrace_ringbuffer_sread_bulk(rb, lc->cache, lc->total, 0); 305 322 #ifdef ENABLE_MEM_STATS 306 323 if (lc->used == lc->total) 307 324 mem_hits.readbulk.ring_hit += 1; … … 309 326 mem_hits.readbulk.miss += 1; 310 327 mem_hits.read.ring_hit += lc->used; 328 #endif 311 329 } 312 330 … … 319 337 i += remaining; 320 338 } 339 #ifdef ENABLE_MEM_STATS 321 340 mem_hits.read.miss += nb_buffers - i; 341 #endif 322 342 assert(i >= min_nb_buffers); 323 343 return i; … … 379 399 memcpy(&lc->cache[lc->used], values, sizeof(void *) * nb_buffers); 380 400 lc->used += nb_buffers; 401 #ifdef ENABLE_MEM_STATS 381 402 mem_hits.write.cache_hit += nb_buffers; 382 403 mem_hits.writebulk.cache_hit += 1; 404 #endif 383 405 return nb_buffers; 384 406 } … … 386 408 else if (nb_buffers > lc->total) { 387 409 i = libtrace_ringbuffer_swrite_bulk(rb, values, nb_buffers, min_nb_buffers); 410 #ifdef ENABLE_MEM_STATS 388 411 if (i) 389 412 mem_hits.writebulk.ring_hit += 1; … … 391 414 mem_hits.writebulk.miss += 1; 392 415 mem_hits.write.ring_hit += i; 416 #endif 393 417 } else { // Not enough cache space but there might later 394 418 // Fill the cache and empty it and then see what we're left with 395 419 i = (lc->total - lc->used); 396 420 memcpy(&lc->cache[lc->used], values, sizeof(void *) * i); 421 #ifdef ENABLE_MEM_STATS 397 422 mem_hits.write.cache_hit += i; 423 #endif 398 424 399 425 // Make sure we still meet the minimum requirement … … 407 433 memmove(lc->cache, &lc->cache[lc->total - lc->used], sizeof(void *) * lc->used); 408 434 435 #ifdef ENABLE_MEM_STATS 409 436 if (lc->used) 410 437 mem_hits.writebulk.miss += 1; … … 412 439 mem_hits.writebulk.ring_hit += 1; 413 440 mem_hits.write.ring_hit += lc->total - lc->used; 441 #endif 414 442 } 415 443 … … 422 450 i += remaining; 423 451 } 452 #ifdef ENABLE_MEM_STATS 424 453 mem_hits.write.miss += nb_buffers - i; 454 #endif 425 455 return i; 426 456 } -
lib/libtrace_int.h
r76291d1 r4007dbb 330 330 /** The actual freelist */ 331 331 libtrace_ocache_t packet_freelist; 332 /** User defined per_ pkt function called when a pktis ready */333 fn_ per_pkt per_pkt;332 /** User defined per_msg function called when a message is ready */ 333 fn_cb_msg per_msg; 334 334 /** User defined reporter function entry point XXX not hooked up */ 335 335 fn_reporter reporter; … … 359 359 libtrace_combine_t combiner; 360 360 struct { 361 fn_handler message_starting; 362 fn_handler message_stopping; 363 fn_handler message_resuming; 364 fn_handler message_pausing; 365 fn_handler message_packet; 361 fn_cb_starting message_starting; 362 fn_cb_dataless message_stopping; 363 fn_cb_dataless message_resuming; 364 fn_cb_dataless message_pausing; 365 fn_cb_packet message_packet; 366 fn_cb_first_packet message_first_packet; 367 fn_cb_tick message_tick_count; 368 fn_cb_tick message_tick_interval; 366 369 } callbacks; 367 370 }; -
lib/libtrace_parallel.h
r76291d1 r4007dbb 154 154 * @param data unused, do not use this 155 155 * @param sender The sender will be set as the current thread 156 * @return When using a function callback for starting, the returned 157 * value is stored against the thread tls. Otherwise the return is ignored. 156 158 */ 157 159 MESSAGE_STARTING, … … 211 213 * sent once a new packet is encountered 212 214 * 213 * @ 215 * @see trace_get_first_packet() 214 216 */ 215 217 MESSAGE_FIRST_PACKET, … … 225 227 * trace_post_reporter() 226 228 * 227 * @see trace_get_first_packet()228 229 */ 229 230 MESSAGE_POST_REPORTER, … … 236 237 * with an earlier time-stamp. 237 238 * 238 * @param data data.uint64 _tholds the system time-stamp in the239 * @param data data.uint64 holds the system time-stamp in the 239 240 * erf format 240 241 * @param sender should be ignored … … 248 249 * threads will see it between the same packets. 249 250 * 250 * @param data The number of packets seen so far across all threads251 * @param data data.uint64 holds the number of packets seen so far across all threads 251 252 * @param sender Set to the current per-packet thread 252 253 */ … … 397 398 /** 398 399 * The definition for the main function that the user supplies to process 399 * packets.400 * messages. 400 401 * 401 402 * @param trace The trace the packet is related to. … … 412 413 * documentation for the message as to what value these will contain. 413 414 */ 414 typedef void* (*fn_ per_pkt)(libtrace_t* trace,415 416 417 418 415 typedef void* (*fn_cb_msg)(libtrace_t* trace, 416 libtrace_thread_t *thread, 417 int mesg_code, 418 libtrace_generic_t data, 419 libtrace_thread_t *sender); 419 420 420 421 /** … … 452 453 * @param libtrace The input trace to start 453 454 * @param global_blob Global data related to this trace accessible using trace_get_global() 454 * @param per_ pkt A user supplied function called when a packetis ready455 * @param per_msg A user supplied function called when a message is ready 455 456 * @param reporter A user supplied function called when a result is ready. 456 457 * Optional if NULL the reporter thread will not be started. … … 459 460 * This can also be used to restart an existing parallel trace, 460 461 * that has previously been paused using trace_ppause(). 461 * In this case global_blob,per_ pktand reporter will only be updated462 * In this case global_blob,per_msg and reporter will only be updated 462 463 * if they are non-null. Otherwise their previous values will be maintained. 463 464 * 464 465 */ 465 466 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, 466 fn_per_pkt per_pkt, fn_reporter reporter); 467 467 fn_cb_msg per_msg, fn_reporter reporter); 468 469 /** 470 * 471 * @param libtrace The parallel trace 472 * @param t The thread that is running 473 * @param global The global storage 474 * @return The returned value is stored against the threads tls. 475 * This is typically passed as tls argument to other messages. 476 */ 477 typedef void* (*fn_cb_starting)(libtrace_t *libtrace, 478 libtrace_thread_t *t, 479 void *global); 480 481 /** 482 * @param libtrace The parallel trace 483 * @param t The thread that is running 484 * @param global The global storage 485 * @param tls The thread local storage 486 */ 487 typedef void (*fn_cb_dataless)(libtrace_t *libtrace, 488 libtrace_thread_t *t, 489 void *global, 490 void *tls); 491 492 /** 493 * @param libtrace The parallel trace 494 * @param t The thread that is running 495 * @param global The global storage 496 * @param tls The thread local storage 497 */ 498 typedef void (*fn_cb_first_packet)(libtrace_t *libtrace, 499 libtrace_thread_t *t, 500 void *global, 501 void *tls, 502 libtrace_packet_t *first_packet, 503 libtrace_thread_t *sender); 504 505 /** 506 * @param libtrace The parallel trace 507 * @param t The thread that is running 508 * @param global The global storage 509 * @param tls The thread local storage 510 * @param uint64_t Either the timestamp or packet count depending on message type 511 */ 512 typedef void (*fn_cb_tick)(libtrace_t *libtrace, 513 libtrace_thread_t *t, 514 void *global, 515 void *tls, 516 uint64_t order); 468 517 469 518 /** 470 519 * @param libtrace The parallel trace 471 520 * @param t The thread 472 * @param data The dataassociated with the message521 * @param packet The packet associated with the message 473 522 * @param global The global storage 474 523 * @param tls The thread local storage 475 */ 476 typedef void* (*fn_handler)(libtrace_t *libtrace, 477 libtrace_thread_t *t, 478 libtrace_generic_t data, 479 void *global, 480 void *tls); 524 * 525 * @return optionally a packet which is handed back to the library, 526 * typically this is the packet supplied. Otherwise NULL. 527 */ 528 typedef libtrace_packet_t* (*fn_cb_packet)(libtrace_t *libtrace, 529 libtrace_thread_t *t, 530 void *global, 531 void *tls, 532 libtrace_packet_t *packet); 481 533 482 534 /** Registers a built-in message with a handler. … … 489 541 * @return 0 if successful otherwise -1. 490 542 */ 491 DLLEXPORT int trace_set_handler(libtrace_t *libtrace, enum libtrace_messages message, fn_handler handler); 543 544 DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler); 545 DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler); 546 DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler); 547 DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler); 548 DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler); 549 DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler); 550 DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler); 551 DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler); 492 552 493 553 /** Pauses a trace previously started with trace_pstart() -
lib/trace.c
r76291d1 r4007dbb 268 268 libtrace->perpkt_queue_full = false; 269 269 libtrace->global_blob = NULL; 270 libtrace->per_ pkt= NULL;270 libtrace->per_msg = NULL; 271 271 libtrace->reporter = NULL; 272 272 libtrace->hasher = NULL; … … 390 390 libtrace->perpkt_queue_full = false; 391 391 libtrace->global_blob = NULL; 392 libtrace->per_ pkt= NULL;392 libtrace->per_msg = NULL; 393 393 libtrace->reporter = NULL; 394 394 libtrace->hasher = NULL; -
lib/trace_parallel.c
r76291d1 r4007dbb 113 113 }; 114 114 115 116 #ifdef ENABLE_MEM_STATS 115 117 // Grrr gcc wants this spelt out 116 118 __thread struct mem_stats mem_hits = {{0},{0},{0},{0}}; 117 119 120 118 121 static void print_memory_stats() { 119 #if 0120 122 uint64_t total; 121 123 #if defined(HAVE_PTHREAD_SETNAME_NP) && defined(__linux__) … … 163 165 total, (double) mem_hits.writebulk.miss / (double) total * 100.0); 164 166 } 167 } 168 #else 169 static void print_memory_stats() {} 165 170 #endif 171 172 static const libtrace_generic_t gen_zero = {0}; 173 174 /* This should optimise away the switch to nothing in the explict cases */ 175 static inline void send_message(libtrace_t *trace, libtrace_thread_t *thread, const enum libtrace_messages type, 176 libtrace_generic_t data, libtrace_thread_t *sender) { 177 fn_cb_dataless fn = NULL; 178 switch (type) { 179 case MESSAGE_STARTING: 180 if (trace->callbacks.message_starting) 181 thread->user_data = (*trace->callbacks.message_starting)(trace, thread, trace->global_blob); 182 else if (trace->per_msg) 183 (*trace->per_msg)(trace, thread, type, data, sender); 184 return; 185 case MESSAGE_FIRST_PACKET: 186 if (trace->callbacks.message_first_packet) 187 (*trace->callbacks.message_first_packet)(trace, thread, trace->global_blob, thread->user_data, data.pkt, sender); 188 else if (trace->per_msg) 189 (*trace->per_msg)(trace, thread, type, data, sender); 190 return; 191 case MESSAGE_TICK_COUNT: 192 if (trace->callbacks.message_tick_count) 193 (*trace->callbacks.message_tick_count)(trace, thread, trace->global_blob, thread->user_data, data.uint64); 194 else if (trace->per_msg) 195 (*trace->per_msg)(trace, thread, type, data, sender); 196 return; 197 case MESSAGE_TICK_INTERVAL: 198 if (trace->callbacks.message_tick_interval) 199 (*trace->callbacks.message_tick_interval)(trace, thread, trace->global_blob, thread->user_data, data.uint64); 200 else if (trace->per_msg) 201 (*trace->per_msg)(trace, thread, type, data, sender); 202 return; 203 case MESSAGE_STOPPING: 204 fn = trace->callbacks.message_stopping; 205 break; 206 case MESSAGE_RESUMING: 207 fn = trace->callbacks.message_resuming; 208 break; 209 case MESSAGE_PAUSING: 210 fn = trace->callbacks.message_pausing; 211 break; 212 213 /* These should be unused */ 214 case MESSAGE_DO_PAUSE: 215 case MESSAGE_DO_STOP: 216 case MESSAGE_POST_REPORTER: 217 case MESSAGE_RESULT: 218 case MESSAGE_PACKET: 219 return; 220 case MESSAGE_USER: 221 break; 222 } 223 if (fn) 224 (*fn)(trace, thread, trace->global_blob, thread->user_data); 225 else if (trace->per_msg) 226 (*trace->per_msg)(trace, thread, type, data, sender); 166 227 } 167 228 … … 378 439 libtrace_generic_t data = {.pkt = *packet}; 379 440 if (trace->callbacks.message_packet) 380 *packet = (*trace->callbacks.message_packet)(trace, t, data, trace->global_blob, t->user_data);381 else 382 *packet = (*trace->per_ pkt)(trace, t, MESSAGE_PACKET, data, t);441 *packet = (*trace->callbacks.message_packet)(trace, t, trace->global_blob, t->user_data, *packet); 442 else if (trace->per_msg) 443 *packet = (*trace->per_msg)(trace, t, MESSAGE_PACKET, data, t); 383 444 trace_fin_packet(*packet); 384 445 } else { 385 446 assert((*packet)->error == READ_TICK); 386 447 libtrace_generic_t data = {.uint64 = trace_packet_get_order(*packet)}; 387 (*trace->per_pkt)(trace, t, MESSAGE_TICK_COUNT, data, t);448 send_message(trace, t, MESSAGE_TICK_COUNT, data, t); 388 449 } 389 450 return 0; … … 452 513 453 514 /* Let the user thread know we are going to pause */ 454 (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t);515 send_message(trace, t, MESSAGE_PAUSING, gen_zero, t); 455 516 456 517 /* Send through any remaining packets (or messages) without delay */ … … 493 554 /* Now we do the actual pause, this returns when we resumed */ 494 555 trace_thread_pause(trace, t); 495 (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t);556 send_message(trace, t, MESSAGE_RESUMING, gen_zero, t); 496 557 return 1; 497 558 } … … 538 599 539 600 /* Let the per_packet function know we have started */ 540 if (trace->callbacks.message_starting) 541 (*trace->callbacks.message_starting)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data); 542 else 543 (*trace->per_pkt)(trace, t, MESSAGE_STARTING, (libtrace_generic_t){0}, t); 544 545 if (trace->callbacks.message_resuming) 546 (*trace->callbacks.message_resuming)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data); 547 else 548 (*trace->per_pkt)(trace, t, MESSAGE_RESUMING, (libtrace_generic_t){0}, t); 601 send_message(trace, t, MESSAGE_STARTING, gen_zero, t); 602 send_message(trace, t, MESSAGE_RESUMING, gen_zero, t); 549 603 550 604 for (;;) { … … 566 620 goto eof; 567 621 } 568 (*trace->per_pkt)(trace, t, message.code, message.data, message.sender); 622 (*trace->per_msg)(trace, t, message.code, message.data, message.sender); 623 (*trace->per_msg)(trace, t, message.code, message.data, message.sender); 569 624 /* Continue and the empty messages out before packets */ 570 625 continue; … … 629 684 630 685 // Let the per_packet function know we have stopped 631 if (trace->callbacks.message_pausing) 632 (*trace->callbacks.message_pausing)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data); 633 else 634 (*trace->per_pkt)(trace, t, MESSAGE_PAUSING, (libtrace_generic_t){0}, t); 635 if (trace->callbacks.message_stopping) 636 (*trace->callbacks.message_stopping)(trace, t, (libtrace_generic_t){0}, trace->global_blob, t->user_data); 637 else 638 (*trace->per_pkt)(trace, t, MESSAGE_STOPPING, (libtrace_generic_t){0}, t); 639 686 send_message(trace, t, MESSAGE_PAUSING, gen_zero, t); 687 send_message(trace, t, MESSAGE_STOPPING, gen_zero, t); 640 688 641 689 // Free any remaining packets … … 1263 1311 */ 1264 1312 static int trace_prestart(libtrace_t * libtrace, void *global_blob, 1265 fn_ per_pkt per_pkt, fn_reporter reporter) {1313 fn_cb_msg per_msg, fn_reporter reporter) { 1266 1314 int i, err = 0; 1267 1315 if (libtrace->state != STATE_PAUSED) { … … 1306 1354 1307 1355 /* Update functions if requested */ 1308 if (per_ pkt)1309 libtrace->per_ pkt = per_pkt;1310 assert(libtrace->per_ pkt);1356 if (per_msg) 1357 libtrace->per_msg = per_msg; 1358 assert(libtrace->per_msg); 1311 1359 if (reporter) 1312 1360 libtrace->reporter = reporter; … … 1530 1578 1531 1579 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, 1532 fn_ per_pkt per_pkt, fn_reporter reporter) {1580 fn_cb_msg per_msg, fn_reporter reporter) { 1533 1581 int i; 1534 1582 int ret = -1; … … 1543 1591 1544 1592 if (libtrace->state == STATE_PAUSED) { 1545 ret = trace_prestart(libtrace, global_blob, per_ pkt, reporter);1593 ret = trace_prestart(libtrace, global_blob, per_msg, reporter); 1546 1594 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); 1547 1595 return ret; … … 1558 1606 /* Store the user defined things against the trace */ 1559 1607 libtrace->global_blob = global_blob; 1560 libtrace->per_ pkt = per_pkt;1608 libtrace->per_msg = per_msg; 1561 1609 libtrace->reporter = reporter; 1562 1610 /* And zero other fields */ … … 1737 1785 } 1738 1786 1739 DLLEXPORT int trace_set_handler(libtrace_t *libtrace, enum libtrace_messages message, fn_handler handler) { 1740 switch (message) { 1741 case MESSAGE_STARTING: 1742 libtrace->callbacks.message_starting = handler; 1743 return 0; 1744 case MESSAGE_STOPPING: 1745 libtrace->callbacks.message_stopping = handler; 1746 return 0; 1747 case MESSAGE_RESUMING: 1748 libtrace->callbacks.message_resuming = handler; 1749 return 0; 1750 case MESSAGE_PAUSING: 1751 libtrace->callbacks.message_pausing = handler; 1752 return 0; 1753 case MESSAGE_PACKET: 1754 libtrace->callbacks.message_packet = handler; 1755 return 0; 1756 default: 1757 return -1; 1758 } 1759 return -1; 1760 } 1761 1787 DLLEXPORT int trace_cb_starting(libtrace_t *libtrace, fn_cb_starting handler) { 1788 libtrace->callbacks.message_starting = handler; 1789 return 0; 1790 } 1791 1792 DLLEXPORT int trace_cb_pausing(libtrace_t *libtrace, fn_cb_dataless handler) { 1793 libtrace->callbacks.message_pausing = handler; 1794 return 0; 1795 } 1796 1797 DLLEXPORT int trace_cb_resuming(libtrace_t *libtrace, fn_cb_dataless handler) { 1798 libtrace->callbacks.message_resuming = handler; 1799 return 0; 1800 } 1801 1802 DLLEXPORT int trace_cb_stopping(libtrace_t *libtrace, fn_cb_dataless handler) { 1803 libtrace->callbacks.message_stopping = handler; 1804 return 0; 1805 } 1806 1807 DLLEXPORT int trace_cb_packet(libtrace_t *libtrace, fn_cb_packet handler) { 1808 libtrace->callbacks.message_packet = handler; 1809 return 0; 1810 } 1811 1812 DLLEXPORT int trace_cb_first_packet(libtrace_t *libtrace, fn_cb_first_packet handler) { 1813 libtrace->callbacks.message_first_packet = handler; 1814 return 0; 1815 } 1816 1817 DLLEXPORT int trace_cb_tick_count(libtrace_t *libtrace, fn_cb_tick handler) { 1818 libtrace->callbacks.message_tick_count = handler; 1819 return 0; 1820 } 1821 1822 DLLEXPORT int trace_cb_tick_interval(libtrace_t *libtrace, fn_cb_tick handler) { 1823 libtrace->callbacks.message_tick_interval = handler; 1824 return 0; 1825 } 1762 1826 1763 1827 /* … … 1769 1833 * 1770 1834 * Once done you should be able to modify the trace setup and call pstart again 1771 * TODO handle changing thread numbers1835 * TODO add support to change the number of threads. 1772 1836 */ 1773 1837 DLLEXPORT int trace_ppause(libtrace_t *libtrace)
Note: See TracChangeset
for help on using the changeset viewer.