Changeset f2066fa
- Timestamp:
- 08/25/15 17:40:12 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 2fa43fa
- Parents:
- 03aca91
- Location:
- lib
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/format_linux_common.c
r773a2a3 rf2066fa 661 661 linuxcommon_close_input_stream(libtrace, stream); 662 662 } 663 libtrace_list_deinit(FORMAT_DATA->per_stream);664 free(libtrace->format_data);665 libtrace->format_data = NULL;666 663 return -1; 667 664 } -
lib/format_linux_int.c
re99c493 rf2066fa 69 69 { 70 70 int ret = linuxcommon_start_input_stream(libtrace, FORMAT_DATA_FIRST); 71 if (ret != 0) {72 libtrace_list_deinit(FORMAT_DATA->per_stream);73 free(libtrace->format_data);74 libtrace->format_data = NULL;75 }76 71 return ret; 77 72 } -
lib/format_linux_ring.c
r9d89626 rf2066fa 280 280 { 281 281 int ret = linuxring_start_input_stream(libtrace, FORMAT_DATA_FIRST); 282 if (ret != 0) {283 libtrace_list_deinit(FORMAT_DATA->per_stream);284 free(libtrace->format_data);285 libtrace->format_data = NULL;286 }287 282 return ret; 288 283 } -
lib/libtrace_int.h
r4007dbb rf2066fa 332 332 /** User defined per_msg function called when a message is ready */ 333 333 fn_cb_msg per_msg; 334 /** User defined reporter function entry point XXX not hooked up*/334 /** User defined reporter function entry point */ 335 335 fn_reporter reporter; 336 336 /** The hasher function */ -
lib/libtrace_parallel.h
r4007dbb rf2066fa 537 537 * 538 538 * @param libtrace The input trace to start 539 * @param message The message to intercept540 539 * @param handler the handler to be called when the message is received 541 540 * @return 0 if successful otherwise -1. … … 627 626 /** Store data against a thread. 628 627 * 629 * @param The parallel trace.630 * @param data The new value to save against the t race628 * @param thread The thread 629 * @param data The new value to save against the thread 631 630 * @return The previously stored value 632 631 * … … 713 712 * as possible (real-time). 714 713 * 715 * @param A parallel input trace714 * @param trace A parallel input trace 716 715 * @param tracetime If true packets are released with time intervals matching 717 716 * the original trace. Otherwise packets are read as fast as possible. … … 895 894 * 896 895 * @param key (Typically) The packets order, see trace_packet_get_order() 897 * @param898 896 */ 899 897 RESULT_PACKET, … … 1051 1049 */ 1052 1050 DLLEXPORT bool trace_has_finished(libtrace_t * libtrace); 1051 1052 1053 /** Check if libtrace is directly reading from multiple queues 1054 * from the format (such as a NICs hardware queues). 1055 * 1056 * When a parallel trace is running, or if checked after its completion 1057 * this returns true if a trace was able to run natively parallel 1058 * from the format. Otherwise false is returned, meaning libtrace is 1059 * distibuting packets across multiple threads from a single source. 1060 * 1061 * Factors that may stop this happening despite the format supporting 1062 * native parallel reads include: the choice of hasher function, 1063 * the number of threads choosen (such as 1 or more than the trace supports) 1064 * or another error when trying to start the parallel format. 1065 * 1066 * If this is called before the trace is started. I.e. before pstart 1067 * this returns an indication that the trace has the possiblity to support 1068 * native parallel reads. After trace pstart is called this should be 1069 * checked again to confirm this has happened. 1070 * 1071 * 1072 * @return true if the trace is parallel or false if the library is splitting 1073 * the trace into multiple threads. 1074 */ 1075 DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace); 1053 1076 1054 1077 /** Returns either the sequence number or erf timestamp of a packet. -
lib/trace_parallel.c
reea427f rf2066fa 365 365 pthread_t tid = pthread_self(); 366 366 // Check if we are reporter or something else 367 if (pthread_equal(tid, libtrace->reporter_thread.tid)) 367 if (libtrace->hasher_thread.type == THREAD_REPORTER && 368 pthread_equal(tid, libtrace->reporter_thread.tid)) 368 369 ret = &libtrace->reporter_thread; 369 else if (pthread_equal(tid, libtrace->hasher_thread.tid)) 370 else if (libtrace->hasher_thread.type == THREAD_HASHER && 371 pthread_equal(tid, libtrace->hasher_thread.tid)) 370 372 ret = &libtrace->hasher_thread; 371 373 else … … 589 591 590 592 if (trace->format->pregister_thread) { 591 trace->format->pregister_thread(trace, t, !trace_has_dedicated_hasher(trace));593 trace->format->pregister_thread(trace, t, trace_is_parallel(trace)); 592 594 } 593 595 … … 1364 1366 libtrace->global_blob = global_blob; 1365 1367 1366 if (libtrace->perpkt_thread_count > 1 && 1367 trace_supports_parallel(libtrace) && 1368 !trace_has_dedicated_hasher(libtrace)) { 1368 if (trace_is_parallel(libtrace)) { 1369 1369 err = libtrace->format->pstart_input(libtrace); 1370 1370 } else { … … 1579 1579 } 1580 1580 1581 DLLEXPORT bool trace_is_parallel(libtrace_t * libtrace) { 1582 if (libtrace->state == STATE_NEW) 1583 return trace_supports_parallel(libtrace); 1584 return libtrace->pread == trace_pread_packet_wrapper; 1585 } 1586 1581 1587 DLLEXPORT int trace_pstart(libtrace_t *libtrace, void* global_blob, 1582 1588 fn_cb_msg per_msg, fn_reporter reporter) { … … 1626 1632 verify_configuration(libtrace); 1627 1633 1634 ret = -1; 1628 1635 /* Try start the format - we prefer parallel over single threaded, as 1629 1636 * these formats should support messages better */ … … 1632 1639 ret = libtrace->format->pstart_input(libtrace); 1633 1640 libtrace->pread = trace_pread_packet_wrapper; 1634 } else { 1641 } 1642 if (ret != 0) { 1635 1643 if (libtrace->format->start_input) { 1636 1644 ret = libtrace->format->start_input(libtrace); … … 1770 1778 libtrace->perpkt_thread_states[THREAD_FINISHED] = 0; 1771 1779 cleanup_started: 1772 if (trace_supports_parallel(libtrace) && 1773 !trace_has_dedicated_hasher(libtrace) 1774 && libtrace->perpkt_thread_count > 1) { 1780 if (libtrace->pread == trace_pread_packet_wrapper) { 1775 1781 if (libtrace->format->ppause_input) 1776 1782 libtrace->format->ppause_input(libtrace); … … 1933 1939 // Save the statistics against the trace 1934 1940 trace_get_statistics(libtrace, NULL); 1935 if (trace_ supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace) && libtrace->perpkt_thread_count > 1) {1941 if (trace_is_parallel(libtrace)) { 1936 1942 libtrace->started = false; 1937 1943 if (libtrace->format->ppause_input) … … 2355 2361 return 0; 2356 2362 } 2357 2358 2359 2363 2360 2364 static bool config_bool_parse(char *value, size_t nvalue) {
Note: See TracChangeset
for help on using the changeset viewer.