Changeset d3849c7
- Timestamp:
- 03/30/15 13:49:08 (6 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 773a2a3
- Parents:
- c723e9e
- Location:
- lib
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/libtrace_parallel.h
r7c95027 rd3849c7 849 849 /** Check if a dedicated hasher thread is being used. 850 850 * 851 * @param[in] libtrace The parallel input trace 851 852 * @return True if the trace has dedicated hasher thread otherwise false. 852 853 * … … 857 858 /** Checks if a trace is using a reporter 858 859 * 859 * @param[in] The parallel input trace860 * @param[in] libtrace The parallel input trace 860 861 * @return True if the trace is using a reporter otherwise false 861 862 */ -
lib/trace.c
rac65c9f rd3849c7 824 824 if (packet->trace && packet->trace->format->fin_packet) { 825 825 packet->trace->format->fin_packet(packet); 826 //gettimeofday(&tv, NULL);827 //printf ("%d.%06d DESTROYED #%"PRIu64"\n", tv.tv_sec, tv.tv_usec, trace_packet_get(packet));828 826 } 829 827 … … 1391 1389 /* Make sure not one bet us to this */ 1392 1390 if (filter->flag) { 1393 printf("Someone bet us to compile the filter\n");1394 1391 assert (pthread_mutex_unlock(&mutex) == 0); 1395 1392 return 1; … … 1498 1495 ASSERT_RET(pthread_mutex_lock(&mutex), == 0); 1499 1496 /* Again double check here like the bpf filter */ 1500 if(filter->jitfilter) 1501 printf("Someone bet us to compile the JIT thingy\n"); 1502 else 1497 if(!filter->jitfilter) 1503 1498 /* Looking at compile_program source this appears to be thread safe 1504 1499 * however if this gets called twice we will leak this memory :( -
lib/trace_parallel.c
rc723e9e rd3849c7 461 461 /* If a hasher thread is running, empty input queues so we don't lose data */ 462 462 if (trace_has_dedicated_hasher(trace)) { 463 fprintf(stderr, "Trace is using a hasher thread emptying queues\n");464 463 // The hasher has stopped by this point, so the queue shouldn't be filling 465 464 while(!libtrace_ringbuffer_is_empty(&t->rbuffer) || t->format_data) { … … 482 481 assert(packet->error <= 0); 483 482 } 484 fprintf(stderr, "PREAD_FAILED %d\n", ret);485 483 libtrace_ocache_free(&trace->packet_freelist, (void **) &packet, 1, 1); 486 484 return -1; … … 522 520 pthread_exit(NULL); 523 521 } 524 //printf("Yay Started perpkt thread #%d\n", (int) get_thread_table_num(trace));525 522 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 526 523 … … 557 554 continue; 558 555 case MESSAGE_DO_STOP: // This is internal 559 fprintf(stderr, "DO_STOP stop!!\n");560 556 goto eof; 561 557 } … … 675 671 pthread_exit(NULL); 676 672 } 677 678 printf("Hasher Thread started\n");679 673 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); 680 674 … … 755 749 for (i = 0; i < trace->perpkt_thread_count; i++) { 756 750 libtrace_packet_t * bcast; 757 fprintf(stderr, "Broadcasting error/EOF now the trace is over\n");758 751 if (i == trace->perpkt_thread_count - 1) { 759 752 bcast = packet; … … 766 759 // Unlock early otherwise we could deadlock 767 760 libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast); 768 } else {769 fprintf(stderr, "SKIPPING THREAD !!!%d!!!/n", (int) i);770 761 } 771 762 ASSERT_RET(pthread_mutex_unlock(&trace->libtrace_lock), == 0); … … 846 837 /* We store the last error message here */ 847 838 if (t->format_data) { 848 fprintf(stderr, "Hit me, ohh yeah got error %d\n",849 ((libtrace_packet_t *)t->format_data)->error);850 839 return ((libtrace_packet_t *)t->format_data)->error; 851 840 } … … 857 846 858 847 if (packets[0]->error <= 0 && packets[0]->error != READ_TICK) { 859 fprintf(stderr, "Hit me, ohh yeah returning error %d\n", packets[0]->error);860 848 return packets[0]->error; 861 849 } … … 877 865 assert(t->format_data == NULL); 878 866 t->format_data = packets[i]; 879 fprintf(stderr, "Hit me, ohh yeah set error %d\n",880 ((libtrace_packet_t *)t->format_data)->error);881 867 } 882 868 break; … … 999 985 libtrace_thread_t *t = &trace->reporter_thread; 1000 986 1001 fprintf(stderr, "Reporter thread starting\n");1002 1003 987 /* Wait until all threads are started */ 1004 988 ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0); … … 1060 1044 uint64_t next_release; 1061 1045 libtrace_thread_t *t = &trace->keepalive_thread; 1062 1063 fprintf(stderr, "keepalive thread is starting\n");1064 1046 1065 1047 /* Wait until all threads are started */ … … 1144 1126 FD_SET(mesg_fd, &rfds); 1145 1127 // We need to wait 1146 1147 //printf("WAITING for %d.%d next=%"PRIu64" curr=%"PRIu64" seconds packettime %f\n", delay_tv.tv_sec, delay_tv.tv_usec, next_release, curr_usec, trace_get_seconds(packet));1148 1128 ret = select(mesg_fd+1, &rfds, NULL, NULL, &delay_tv); 1149 1129 if (ret == 0) { … … 1152 1132 return READ_MESSAGE; 1153 1133 } else { 1154 fprintf(stderr, "I thnik we broke select\n");1134 assert(!"trace_delay_packet: Unexpected return from select"); 1155 1135 } 1156 1136 } … … 1320 1300 trace_supports_parallel(libtrace) && 1321 1301 !trace_has_dedicated_hasher(libtrace)) { 1322 fprintf(stderr, "Restarting trace pstart_input()\n");1323 1302 err = libtrace->format->pstart_input(libtrace); 1324 1303 } else { 1325 1304 if (libtrace->format->start_input) { 1326 fprintf(stderr, "Restarting trace start_input()\n");1327 1305 err = libtrace->format->start_input(libtrace); 1328 1306 } … … 1522 1500 env = getenv(env_name); 1523 1501 if (env) { 1524 printf("Got %s=%s", env_name, env);1525 1502 trace_set_configuration(libtrace, env); 1526 1503 } … … 1531 1508 env = getenv(env_name); 1532 1509 if (env) { 1533 printf("Got %s=%s", env_name, env);1534 1510 trace_set_configuration(libtrace, env); 1535 1511 } … … 1587 1563 if (trace_supports_parallel(libtrace) && 1588 1564 !trace_has_dedicated_hasher(libtrace)) { 1589 printf("Using the parallel trace format\n");1590 1565 ret = libtrace->format->pstart_input(libtrace); 1591 1566 libtrace->pread = trace_pread_packet_wrapper; 1592 1567 } else { 1593 printf("Using single threaded interface\n");1594 1568 if (libtrace->format->start_input) { 1595 1569 ret = libtrace->format->start_input(libtrace); … … 1766 1740 ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0); 1767 1741 if (!libtrace->started || libtrace->state != STATE_RUNNING) { 1768 fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state);1769 1742 trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()"); 1770 1743 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0); … … 1860 1833 } else { 1861 1834 int err; 1862 fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata);1863 1835 err = trace_pause(libtrace); 1864 1836 // We should handle this a bit better … … 1970 1942 * user controlled */ 1971 1943 for (i=0; i< libtrace->perpkt_thread_count; i++) { 1972 //printf("Waiting to join with perpkt #%d\n", i);1973 1944 ASSERT_RET(pthread_join(libtrace->perpkt_threads[i].tid, NULL), == 0); 1974 //printf("Joined with perpkt #%d\n", i);1975 1945 // So we must do our best effort to empty the queue - so 1976 1946 // the producer (or any other threads) don't block. … … 2332 2302 uc->debug_state = config_bool_parse(value, nvalue); 2333 2303 } else { 2334 fprintf(stderr, "No matching value %s(=%s)\n", key, value);2304 fprintf(stderr, "No matching option %s(=%s), ignoring\n", key, value); 2335 2305 } 2336 2306 } … … 2353 2323 config_string(&trace->config, key, sizeof(key), value, sizeof(value)); 2354 2324 } else { 2355 fprintf(stderr, "Error parsing %s\n", pch);2325 fprintf(stderr, "Error parsing option %s\n", pch); 2356 2326 } 2357 2327 pch = strtok (NULL," ,.-");
Note: See TracChangeset
for help on using the changeset viewer.