- Timestamp:
- 10/13/15 09:51:22 (5 years ago)
- Branches:
- 4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
- Children:
- 21c0d70
- Parents:
- 92a2bf6
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
lib/combiner_ordered.c
ra31e166 r44f9892 30 30 } 31 31 32 inline static uint64_t next_message(libtrace_queue_t *v) { 32 inline static int peek_queue(libtrace_t *trace, libtrace_combine_t *c, 33 libtrace_queue_t *v, uint64_t *key, libtrace_result_t *peeked) { 33 34 34 35 libtrace_result_t r; 35 if (libtrace_deque_peek_front(v, (void *) &r) == 0) 36 return 0; 37 38 if (r.type == RESULT_TICK_INTERVAL || r.type == RESULT_TICK_COUNT) 39 return 0; 40 41 return r.key; 42 } 43 44 inline static int peek_queue(libtrace_t *trace, libtrace_combine_t *c, 45 libtrace_queue_t *v, uint64_t *key) { 46 47 libtrace_result_t r; 48 libtrace_deque_peek_front(v, (void *) &r); 36 if (!peeked) { 37 libtrace_deque_peek_front(v, (void *) &r); 38 peeked = &r; 39 } 49 40 50 41 /* Ticks are a bit tricky, because we can get TS … … 56 47 */ 57 48 58 if ( r.type == RESULT_TICK_INTERVAL) {59 if ( r.key > c->last_ts_tick) {60 c->last_ts_tick = r.key;49 if (peeked->type == RESULT_TICK_INTERVAL) { 50 if (peeked->key > c->last_ts_tick) { 51 c->last_ts_tick = peeked->key; 61 52 62 53 /* Tick doesn't match packet order */ 63 54 if (!trace_is_parallel(trace)) { 64 55 /* Pass straight to reporter */ 65 libtrace_generic_t gt = {.res = &r};66 ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);56 libtrace_generic_t gt = {.res = peeked}; 57 ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1); 67 58 send_message(trace, &trace->reporter_thread, 68 59 MESSAGE_RESULT, gt, … … 71 62 } 72 63 /* Tick matches packet order */ 73 *key = r.key;64 *key = peeked->key; 74 65 return 1; 75 66 76 67 } else { 77 68 /* Duplicate -- pop it */ 78 ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);69 ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1); 79 70 return 0; 80 71 } 81 72 } 82 73 83 if ( r.type == RESULT_TICK_COUNT) {84 if ( r.key > c->last_count_tick) {85 c->last_count_tick = r.key;74 if (peeked->type == RESULT_TICK_COUNT) { 75 if (peeked->key > c->last_count_tick) { 76 c->last_count_tick = peeked->key; 86 77 87 78 /* Tick doesn't match packet order */ 88 79 if (trace_is_parallel(trace)) { 89 80 /* Pass straight to reporter */ 90 libtrace_generic_t gt = {.res = &r};91 ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);81 libtrace_generic_t gt = {.res = peeked}; 82 ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1); 92 83 send_message(trace, &trace->reporter_thread, 93 84 MESSAGE_RESULT, gt, … … 96 87 } 97 88 /* Tick matches packet order */ 98 *key = r.key;89 *key = peeked->key; 99 90 return 1; 100 91 … … 102 93 } else { 103 94 /* Duplicate -- pop it */ 104 ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1);95 ASSERT_RET (libtrace_deque_pop_front(v, (void *) peeked), == 1); 105 96 return 0; 106 97 } 107 98 } 108 109 *key = r.key;99 100 *key = peeked->key; 110 101 return 1; 111 102 } 103 104 inline static uint64_t next_message(libtrace_t *trace, libtrace_combine_t *c, 105 libtrace_queue_t *v) { 106 107 libtrace_result_t r; 108 uint64_t nextkey = 0; 109 110 do { 111 if (libtrace_deque_peek_front(v, (void *) &r) == 0) { 112 return 0; 113 } 114 } while (peek_queue(trace, c, v, &nextkey, &r) == 0); 115 116 return nextkey; 117 } 118 112 119 113 120 inline static void read_internal(libtrace_t *trace, libtrace_combine_t *c, const bool final){ … … 126 133 for (i = 0; i < trace_get_perpkt_threads(trace); ++i) { 127 134 libtrace_queue_t *v = &queues[i]; 128 if (libtrace_deque_get_size(v) != 0) { 129 if (peek_queue(trace, c, v, &peeked)) { 130 live_count ++; 131 live[i] = true; 132 key[i] = peeked; 133 if (i == 0 || min_key > peeked) { 134 min_key = peeked; 135 min_queue = i; 136 } 137 } else { 138 live[i] = false; 139 key[i] = 0; 135 if (libtrace_deque_get_size(v) != 0 && 136 peek_queue(trace, c, v, &peeked, NULL)) { 137 live_count ++; 138 live[i] = true; 139 key[i] = peeked; 140 if (i == 0 || min_key > peeked) { 141 min_key = peeked; 142 min_queue = i; 140 143 } 141 144 } else { … … 149 152 * joined we always flush what's left. Or the next smallest is the same 150 153 * value or less than the previous */ 151 while ((allactive && min_queue != -1) || (live_count && final) 152 || (live_count && prev_min >= min_key)) { 154 while (allactive || (live_count && final)) { 153 155 /* Get the minimum queue and then do stuff */ 154 156 libtrace_result_t r; … … 156 158 157 159 ASSERT_RET (libtrace_deque_pop_front(&queues[min_queue], (void *) &r), == 1); 160 158 161 send_message(trace, &trace->reporter_thread, 159 162 MESSAGE_RESULT, gt, … … 161 164 162 165 // Now update the one we just removed 163 peeked = next_message(&queues[min_queue]); 164 if (libtrace_deque_get_size(&queues[min_queue]) && 165 peeked != 0) { 166 peeked = next_message(trace, c, &queues[min_queue]); 167 if (peeked != 0) { 166 168 167 169 key[min_queue] = peeked; … … 182 184 allactive = false; 183 185 live[min_queue] = false; 186 key[min_queue] = 0; 184 187 live_count--; 185 188 prev_min = min_key; 186 189 min_key = UINT64_MAX; // Update our minimum 190 min_queue = -1; 187 191 // Check all find the smallest again - all are alive 188 192 for (i = 0; i < trace_get_perpkt_threads(trace); ++i) {
Note: See TracChangeset
for help on using the changeset viewer.