Changeset fac8c46 for lib/trace_parallel.c
- Timestamp:
- 05/20/14 03:25:13 (8 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 82facc5
- Parents:
- a5662447
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/trace_parallel.c
r5ce14a5 rfac8c46 107 107 } contention_stats[1024]; 108 108 109 inline int trace_has_dedicated_hasher(libtrace_t * libtrace); 109 110 110 111 /** … … 188 189 } 189 190 191 /** Used below in trace_make_results_packets_safe*/ 192 static void do_copy_result_packet(void *data) 193 { 194 libtrace_result_t *res = (libtrace_result_t *)data; 195 if (res->is_packet) { 196 // Duplicate the packet in standard malloc'd memory and free the 197 // original 198 libtrace_packet_t *oldpkt, *dup; 199 oldpkt = (libtrace_packet_t *) res->value; 200 dup = trace_copy_packet(oldpkt); 201 res->value = (void *)dup; 202 trace_destroy_packet(oldpkt); 203 fprintf(stderr, "Made a packet safe!!\n"); 204 } 205 } 206 207 /** 208 * Make a safe replacement copy of any result packets that are owned 209 * by the format in the result queue. Used when pausing traces. 210 */ 211 static void trace_make_results_packets_safe(libtrace_t *trace) { 212 libtrace_thread_t *t = get_thread_descriptor(trace); 213 if (trace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) 214 libtrace_deque_apply_function(&t->deque, &do_copy_result_packet); 215 else 216 libtrace_vector_apply_function(&t->vector, &do_copy_result_packet); 217 } 218 190 219 /** 191 220 * Holds threads in a paused state, until released by broadcasting … … 194 223 static void trace_thread_pause(libtrace_t *trace) { 195 224 printf("Pausing thread #%d\n", get_thread_table_num(trace)); 225 trace_make_results_packets_safe(trace); 196 226 assert(pthread_mutex_lock(&trace->libtrace_lock) == 0); 197 227 trace->perpkts_pausing++; 198 228 pthread_cond_broadcast(&trace->perpkt_cond); 199 while ( !trace->started) {229 while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) { 200 230 assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0); 201 231 } … … 212 242 libtrace_t *trace = (libtrace_t *)data; 213 243 libtrace_thread_t * t; 214 libtrace_message_t message ;244 libtrace_message_t message = {0}; 215 245 libtrace_packet_t *packet = NULL; 216 246 … … 237 267 if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) { 238 268 switch (message.code) { 239 case MESSAGE_PAUSE: 269 case MESSAGE_DO_PAUSE: // This is internal 270 // Send message to say we are pausing, TODO consider sender 271 message.code = MESSAGE_PAUSING; 272 (*trace->per_pkt)(trace, NULL, &message, t); 273 // If a hasher thread is running empty input queues so we don't loose data 274 if (trace_has_dedicated_hasher(trace)) { 275 fprintf(stderr, "Trace is using a hasher thread emptying queues\n"); 276 // The hasher has stopped by this point, so the queue shouldn't be filling 277 while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) { 278 psize = trace_pread_packet(trace, &packet); 279 if (psize > 0) { 280 packet = (*trace->per_pkt)(trace, packet, NULL, t); 281 } else { 282 fprintf(stderr, "Psize=%d empty=%d this is probably EOF or message waiting, but if this continues we have a bug!!!\n", psize, libtrace_ringbuffer_is_empty(&t->rbuffer)); 283 } 284 } 285 } 286 // Send a paused message as a final chance to memory copy any packets 287 message.code = MESSAGE_PAUSED; 288 (*trace->per_pkt)(trace, NULL, &message, t); 289 // Now we do the actual pause, this returns when we are done 240 290 trace_thread_pause(trace); 241 break; 242 case MESSAGE_STOP: 291 // Check for new messages as soon as we return 292 continue; 293 case MESSAGE_DO_STOP: // This is internal 243 294 goto stop; 244 295 } … … 296 347 297 348 /** True if trace has dedicated hasher thread otherwise false */ 298 inline int trace_has_dedicated_hasher(libtrace_t * libtrace);299 349 inline int trace_has_dedicated_hasher(libtrace_t * libtrace) 300 350 { … … 312 362 int i; 313 363 libtrace_packet_t * packet; 364 libtrace_message_t message = {0}; 314 365 315 366 assert(trace_has_dedicated_hasher(trace)); … … 331 382 break; 332 383 384 // Check for messages that we expect MESSAGE_DO_PAUSE, (internal messages only) 385 if (libtrace_message_queue_try_get(&t->messages, &message) != LIBTRACE_MQ_FAILED) { 386 switch(message.code) { 387 case MESSAGE_DO_PAUSE: 388 fprintf(stderr, "Pausing hasher thread\n"); 389 assert(pthread_mutex_lock(&trace->libtrace_lock) == 0); 390 trace->perpkts_pausing++; 391 pthread_cond_broadcast(&trace->perpkt_cond); 392 while (trace->state == STATE_PAUSED || trace->state == STATE_PAUSING) { 393 assert(pthread_cond_wait(&trace->perpkt_cond, &trace->libtrace_lock) == 0); 394 } 395 // trace->perpkts_pausing--; // Don't do this the pausing thread will do this for us 396 pthread_cond_broadcast(&trace->perpkt_cond); 397 assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0); 398 fprintf(stdout, "Mapper resuming\n"); 399 break; 400 case MESSAGE_DO_STOP: 401 // Stop called after pause 402 assert(trace->started == false); 403 assert(trace->state == STATE_FINSHED); 404 default: 405 fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code); 406 } 407 pkt_skipped = 1; 408 continue; 409 } 410 333 411 if ((packet->error = trace_read_packet(trace, packet)) <1 /*&& psize != LIBTRACE_MESSAGE_WAITING*/) { 334 412 break; /* We are EOF or error'd either way we stop */ … … 361 439 assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0); 362 440 // Unlock early otherwise we could deadlock 363 libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, NULL);441 libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, bcast); 364 442 } else { 365 443 assert(pthread_mutex_unlock(&trace->libtrace_lock) == 0); … … 372 450 373 451 // Notify only after we've defiantly set the state to finished 374 libtrace_message_t message;375 452 message.code = MESSAGE_PERPKT_ENDED; 376 453 message.additional = NULL; … … 437 514 438 515 if (*packet) { 439 return 1;516 return (*packet)->error; 440 517 } else { 441 printf("Got a NULL packet the trace is over\n"); 442 return -1; // We are done for some reason 518 // This is how we do a notify, we send a message before hand to note that the trace is over etc. 519 // And this will notify the perpkt thread to read that message, this is easiest 520 // since cases like pause can also be dealt with this way without actually 521 // having to be the end of the stream. 522 fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n"); 523 return -2; 443 524 } 444 525 } … … 771 852 } 772 853 assert(pthread_spin_unlock(&libtrace->first_packets.lock) == 0); 773 libtrace_message_t mesg ;854 libtrace_message_t mesg = {0}; 774 855 mesg.code = MESSAGE_FIRST_PACKET; 775 856 mesg.additional = NULL; … … 937 1018 trace_fin_packet(*packet); 938 1019 939 if ( libtrace->format->pread_packet) {1020 if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) { 940 1021 if (!*packet) 941 1022 *packet = trace_create_packet(); 942 1023 ret = trace_pread_packet_wrapper(libtrace, *packet); 943 } else if (!libtrace->hasher) {944 /* We don't care about which core a packet goes to */945 ret = trace_pread_packet_first_in_first_served(libtrace, packet);946 1024 } else if (trace_has_dedicated_hasher(libtrace)) { 947 1025 ret = trace_pread_packet_hasher_thread(libtrace, packet); 948 } else if (libtrace->reducer_flags & PERPKT_USE_SLIDING_WINDOW) { 1026 } else if (!trace_has_dedicated_hasher(libtrace)) { 1027 /* We don't care about which core a packet goes to */ 1028 ret = trace_pread_packet_first_in_first_served(libtrace, packet); 1029 } /* else if (libtrace->reducer_flags & PERPKT_USE_SLIDING_WINDOW) { 949 1030 ret = trace_pread_packet_sliding_window(libtrace, packet); 950 1031 } else { 951 1032 ret = trace_pread_packet_hash_locked(libtrace, packet); 952 } 1033 }*/ 953 1034 954 1035 // Formats can also optionally do this internally to ensure the first … … 978 1059 /* Start an input trace in a parallel fashion. 979 1060 * 980 * @param libtrace 981 * @param global_blob some global data you can share with the new thread1061 * @param libtrace the input trace to start 1062 * @param global_blob some global data you can share with the new perpkt threads 982 1063 * @returns 0 on success 983 1064 */ … … 989 1070 assert(libtrace); 990 1071 if (trace_is_err(libtrace)) 991 return -1;; 992 if (libtrace->perpkts_pausing != 0) { 993 printf("Restarting trace\n"); 994 libtrace->format->pstart_input(libtrace); 995 // TODO empty any queues out here // 1072 return -1; 1073 if (libtrace->state == STATE_PAUSED) { 1074 assert (libtrace->perpkts_pausing != 0); 1075 fprintf(stderr, "Restarting trace\n"); 1076 int err; 1077 if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) { 1078 printf("This format has direct support for p's\n"); 1079 err = libtrace->format->pstart_input(libtrace); 1080 } else { 1081 if (libtrace->format->start_input) { 1082 err = libtrace->format->start_input(libtrace); 1083 } 1084 } 996 1085 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0); 997 1086 libtrace->started = true; 1087 libtrace->state = STATE_RUNNING; 998 1088 assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0); 999 1089 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); … … 1001 1091 } 1002 1092 1093 assert(libtrace->state == STATE_NEW); 1003 1094 libtrace_parallel = 1; 1004 1095 … … 1030 1121 1031 1122 libtrace->started=true; // Before we start the threads otherwise we could have issues 1123 libtrace->state = STATE_RUNNING; 1032 1124 /* Disable signals - Pthread signal handling */ 1033 1125 … … 1043 1135 t->type = THREAD_HASHER; 1044 1136 t->state = THREAD_RUNNING; 1137 libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t)); 1045 1138 assert(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace) == 0); 1046 1139 } else { 1047 1140 libtrace->hasher_thread.type = THREAD_EMPTY; 1048 1141 } 1049 libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_ POLLING);1142 libtrace_ringbuffer_init(&libtrace->packet_freelist, libtrace->packet_freelist_size, LIBTRACE_RINGBUFFER_BLOCKING); 1050 1143 libtrace_slidingwindow_init(&libtrace->sliding_window, libtrace->packet_freelist_size, 0); 1051 1144 assert(sem_init(&libtrace->sem, 0, libtrace->packet_freelist_size) == 0); … … 1079 1172 t->perpkt_num = i; 1080 1173 if (libtrace->hasher) 1081 libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_ POLLING);1174 libtrace_ringbuffer_init(&t->rbuffer, libtrace->perpkt_buffer_size, LIBTRACE_RINGBUFFER_BLOCKING); 1082 1175 // Depending on the mode vector or deque might be chosen 1083 1176 libtrace_vector_init(&t->vector, sizeof(libtrace_result_t)); … … 1093 1186 int threads_started = 0; 1094 1187 /* Setup the trace and start our threads */ 1095 if (libtrace->perpkt_thread_count > 1 && libtrace->format->pstart_input) {1188 if (libtrace->perpkt_thread_count > 1 && trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) { 1096 1189 printf("This format has direct support for p's\n"); 1097 1190 threads_started = libtrace->format->pstart_input(libtrace); … … 1128 1221 * 4. Return with perpkts_pausing set to perpkt_count (Used when restarting so we reuse the threads) 1129 1222 * 1130 * Once done you should be a able to modify the trace setup and call pstart again1223 * Once done you should be able to modify the trace setup and call pstart again 1131 1224 * TODO handle changing thread numbers 1132 1225 */ … … 1136 1229 int i; 1137 1230 assert(libtrace); 1138 if (!libtrace->started) { 1231 if (!libtrace->started || libtrace->state != STATE_RUNNING) { 1232 fprintf(stderr, "pause failed started=%d state=%s (%d)\n", libtrace->started, get_trace_state_name(libtrace->state), libtrace->state); 1139 1233 trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_ppause()"); 1140 1234 return -1; … … 1143 1237 t = get_thread_table(libtrace); 1144 1238 1145 // Set paus ed1239 // Set pausing 1146 1240 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0); 1147 libtrace->sta rted = false;1241 libtrace->state = STATE_PAUSING; 1148 1242 pthread_cond_broadcast(&libtrace->perpkt_cond); 1149 1243 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); 1150 1244 1151 printf("Sending messages \n"); 1245 // Special case handle the hasher thread case 1246 if (trace_has_dedicated_hasher(libtrace)) { 1247 fprintf(stderr, "Hasher thread running we deal with this special!\n"); 1248 libtrace_message_t message = {0}; 1249 message.code = MESSAGE_DO_PAUSE; 1250 message.additional = NULL; 1251 trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message); 1252 // Wait for it to pause 1253 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0); 1254 while (1 != libtrace->perpkts_pausing) { 1255 assert(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock) == 0); 1256 } 1257 libtrace->perpkts_pausing--; // Do this on the hasher's behalf 1258 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); 1259 } 1260 1261 fprintf(stderr, "Sending messages \n"); 1152 1262 // Stop threads, skip this one if its a perpkt 1153 1263 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1154 1264 if (&libtrace->perpkt_threads[i] != t) { 1155 libtrace_message_t message ;1156 message.code = MESSAGE_ PAUSE;1265 libtrace_message_t message = {0}; 1266 message.code = MESSAGE_DO_PAUSE; 1157 1267 message.additional = NULL; 1158 1268 trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message); 1269 if(trace_has_dedicated_hasher(libtrace)) { 1270 // The hasher has stopped and other threads have messages waiting therefore 1271 // If the queues are empty the other threads would have no data 1272 // So send some NULL packets to simply ask the threads to check there message queues 1273 // We are the only writer since hasher has paused 1274 libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, NULL); 1275 } 1276 } else { 1277 fprintf(stderr, "Mapper threads should not be used to pause a trace this could cause any number of problems!!\n"); 1159 1278 } 1160 1279 } … … 1172 1291 } 1173 1292 1174 printf("Threads are pausing\n"); 1175 1176 // Do a early pause to kick threads out - XXX testing for int 1177 if (libtrace->format->pause_input) 1178 libtrace->format->pause_input(libtrace); 1293 fprintf(stderr, "Asking threads to pause\n"); 1179 1294 1180 1295 // Wait for all threads to pause … … 1185 1300 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); 1186 1301 1187 printf("Threads have paused\n"); 1188 1189 if (trace_supports_parallel(libtrace)) { 1302 fprintf(stderr, "Threads have paused\n"); 1303 1304 if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) { 1305 libtrace->started = false; 1190 1306 if (libtrace->format->ppause_input) 1191 1307 libtrace->format->ppause_input(libtrace); 1192 1308 // TODO What happens if we don't have pause input?? 1193 1309 } else { 1194 printf("Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata); 1195 // This doesn't really work because this could be called by any thread 1196 // Maybe we should grab the lock here?? 1197 if (libtrace->format->pause_input) 1198 libtrace->format->pause_input(libtrace); 1199 // TODO What happens if we don't have pause input?? 1200 } 1201 1310 int err; 1311 fprintf(stderr, "Trace is not parallel so we are doing a normal pause %s\n", libtrace->uridata); 1312 err = trace_pause(libtrace); 1313 // We should handle this a bit better 1314 if (err) 1315 return err; 1316 } 1317 1318 // Only set as paused after the pause has been called on the trace 1319 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0); 1320 libtrace->state = STATE_PAUSED; 1321 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); 1202 1322 return 0; 1203 1323 } … … 1213 1333 { 1214 1334 int i; 1335 libtrace_message_t message = {0}; 1336 assert(libtrace); 1215 1337 if (!libtrace->started) { 1216 1338 trace_set_err(libtrace,TRACE_ERR_BAD_STATE, "You must call trace_start() before calling trace_pstop()"); … … 1219 1341 1220 1342 // Ensure all threads have paused and the underlying trace format has 1221 // been closed 1343 // been closed and all packets associated are cleaned up 1222 1344 trace_ppause(libtrace); 1223 1345 1224 1346 // Now send a message asking the threads to stop 1225 1347 // This will be retrieved before trying to read another packet 1348 1349 message.code = MESSAGE_DO_STOP; 1350 message.additional = NULL; 1351 trace_send_message_to_perpkts(libtrace, &message); 1352 if (trace_has_dedicated_hasher(libtrace)) 1353 trace_send_message_to_thread(libtrace, &libtrace->hasher_thread, &message); 1354 1226 1355 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1227 libtrace_message_t message;1228 message.code = MESSAGE_STOP;1229 message.additional = NULL;1230 1356 trace_send_message_to_thread(libtrace, &libtrace->perpkt_threads[i], &message); 1231 1357 } … … 1233 1359 // Now release the threads and let them stop 1234 1360 assert(pthread_mutex_lock(&libtrace->libtrace_lock) == 0); 1235 libtrace->sta rted = true;1361 libtrace->state = STATE_FINSHED; 1236 1362 assert(pthread_cond_broadcast(&libtrace->perpkt_cond) == 0); 1237 1363 assert(pthread_mutex_unlock(&libtrace->libtrace_lock) == 0); … … 1256 1382 if (hasher) { 1257 1383 trace->hasher = hasher; 1258 trace->hasher_data = hasher;1384 trace->hasher_data = data; 1259 1385 } else { 1260 1386 trace->hasher = NULL; … … 1279 1405 return 0; 1280 1406 case HASHER_BIDIRECTIONAL: 1281 trace->hasher = toeplitz_hash_packet;1407 trace->hasher = &toeplitz_hash_packet; 1282 1408 trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t)); 1283 1409 toeplitz_init_config(trace->hasher_data, 1); 1284 1410 return 0; 1285 1411 case HASHER_UNIDIRECTIONAL: 1286 trace->hasher = toeplitz_hash_packet;1412 trace->hasher = &toeplitz_hash_packet; 1287 1413 trace->hasher_data = calloc(1, sizeof(toeplitz_conf_t)); 1288 toeplitz_init_config(trace->hasher_data, 1);1414 toeplitz_init_config(trace->hasher_data, 0); 1289 1415 return 0; 1290 1416 case HASHER_HARDWARE: … … 1326 1452 // XXX signal it to stop 1327 1453 if (trace_has_dedicated_hasher(libtrace)) { 1328 printf("Waiting to join with the hasher\n");1454 fprintf(stderr, "Waiting to join with the hasher\n"); 1329 1455 pthread_join(libtrace->hasher_thread.tid, NULL); 1330 printf("Joined with with the hasher\n");1456 fprintf(stderr, "Joined with with the hasher\n"); 1331 1457 libtrace->hasher_thread.state = THREAD_FINISHED; 1332 1458 } … … 1416 1542 } 1417 1543 1544 DLLEXPORT int trace_send_message_to_perpkts(libtrace_t * libtrace, libtrace_message_t * message) 1545 { 1546 int i; 1547 message->sender = get_thread_descriptor(libtrace); 1548 for (i = 0; i < libtrace->perpkt_thread_count; i++) { 1549 libtrace_message_queue_put(&libtrace->perpkt_threads[i].messages, message); 1550 } 1551 //printf("Sending message code=%d to reducer\n", message->code); 1552 return 0; 1553 } 1554 1418 1555 DLLEXPORT void libtrace_result_set_key(libtrace_result_t * result, uint64_t key) { 1419 1556 result->key = key; … … 1525 1662 DLLEXPORT void trace_publish_result(libtrace_t *libtrace, uint64_t key, void * value) { 1526 1663 libtrace_result_t res; 1664 res.is_packet = 0; 1527 1665 // Who am I??? 1528 1666 int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ? … … 1553 1691 // sched_yield(); 1554 1692 1693 if (libtrace_vector_get_size(&t->vector) >= 800) { 1694 trace_post_reduce(libtrace); 1695 } 1696 libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :) 1697 } 1698 } 1699 1700 DLLEXPORT void trace_publish_packet(libtrace_t *libtrace, libtrace_packet_t *packet) { 1701 libtrace_result_t res; 1702 // Who am I??? 1703 int this_thread = get_thread_table_num(libtrace); // Could be worth caching ... ? 1704 libtrace_thread_t * t = &libtrace->perpkt_threads[this_thread]; 1705 // Now put it into my table 1706 static __thread int count = 0; 1707 1708 res.is_packet = 1; 1709 libtrace_result_set_key_value(&res, trace_packet_get_order(packet), packet); 1710 /* 1711 if (count == 1) 1712 printf("My vector size is %d\n", libtrace_vector_get_size(&t->vector)); 1713 count = (count+1) %1000; 1714 libtrace_vector_push_back(&t->vector, &res); // Automatically locking for us :) 1715 */ 1716 /*if (count == 1) 1717 printf("My vector size is %d\n", libtrace_deque_get_size(&t->deque)); 1718 count = (count+1)%1000;*/ 1719 if (libtrace->reducer_flags & (REDUCE_SEQUENTIAL | REDUCE_ORDERED)) { 1555 1720 if (libtrace_deque_get_size(&t->deque) >= 800) { 1721 trace_post_reduce(libtrace); 1722 } 1723 //while (libtrace_deque_get_size(&t->deque) >= 1000) 1724 // sched_yield(); 1725 libtrace_deque_push_back(&t->deque, &res); // Automatically locking for us :) 1726 } else { 1727 //while (libtrace_vector_get_size(&t->vector) >= 1000) 1728 // sched_yield(); 1729 1730 if (libtrace_vector_get_size(&t->vector) >= 800) { 1556 1731 trace_post_reduce(libtrace); 1557 1732 }
Note: See TracChangeset
for help on using the changeset viewer.