Changeset 3dd5acc for lib/combiner_ordered.c
 Timestamp:
 09/09/15 14:04:10 (6 years ago)
 Branches:
 4.0.1hotfixes, cachetimestamps, develop, dpdkndag, etsilive, libtrace4, master, ndag_format, pfring, rc4.0.1, rc4.0.2, rc4.0.3, rc4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
 Children:
 a2dcdad
 Parents:
 0f6bc3f
 File:

 1 edited
Legend:
 Unmodified
 Added
 Removed

lib/combiner_ordered.c
rf01c479 r3dd5acc 30 30 } 31 31 32 inline static void read_internal(libtrace_t *trace, libtrace_queue_t *queues, const bool final){ 32 inline static uint64_t next_message(libtrace_queue_t *v) { 33 34 libtrace_result_t r; 35 if (libtrace_deque_peek_front(v, (void *) &r) == 0) 36 return 0; 37 38 if (r.type == RESULT_TICK_INTERVAL  r.type == RESULT_TICK_COUNT) 39 return 0; 40 41 return r.key; 42 } 43 44 inline static int peek_queue(libtrace_t *trace, libtrace_combine_t *c, 45 libtrace_queue_t *v, uint64_t *key) { 46 47 libtrace_result_t r; 48 libtrace_deque_peek_front(v, (void *) &r); 49 50 /* Ticks are a bit tricky, because we can get TS 51 * ticks in amongst packets indexed by their cardinal 52 * order and vice versa. Also, every thread will 53 * produce an equivalent tick and we should really 54 * combine those into a single tick for the reporter 55 * thread. 56 */ 57 58 if (r.type == RESULT_TICK_INTERVAL) { 59 if (r.key > c>last_ts_tick) { 60 c>last_ts_tick = r.key; 61 62 /* Tick doesn't match packet order */ 63 if (!trace_is_parallel(trace)) { 64 /* Pass straight to reporter */ 65 libtrace_generic_t gt = {.res = &r}; 66 ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1); 67 trace>reporter(trace, MESSAGE_RESULT, gt, &trace>reporter_thread); 68 return 0; 69 } 70 /* Tick matches packet order */ 71 *key = r.key; 72 return 1; 73 74 } else { 75 /* Duplicate  pop it */ 76 ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1); 77 return 0; 78 } 79 } 80 81 if (r.type == RESULT_TICK_COUNT) { 82 if (r.key > c>last_count_tick) { 83 c>last_count_tick = r.key; 84 85 /* Tick doesn't match packet order */ 86 if (trace_is_parallel(trace)) { 87 /* Pass straight to reporter */ 88 libtrace_generic_t gt = {.res = &r}; 89 ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1); 90 trace>reporter(trace, MESSAGE_RESULT, gt, &trace>reporter_thread); 91 return 0; 92 } 93 /* Tick matches packet order */ 94 *key = r.key; 95 return 1; 96 97 /* Tick doesn't match packet order */ 98 } else { 99 /* Duplicate  pop it */ 100 ASSERT_RET (libtrace_deque_pop_front(v, (void *) &r), == 1); 101 return 0; 102 } 103 } 104 105 *key = r.key; 106 return 1; 107 } 108 109 inline static void read_internal(libtrace_t *trace, libtrace_combine_t *c, const bool final){ 33 110 int i; 34 111 int live_count = 0; 35 bool live[libtrace_get_perpkt_count(trace)]; // Set if a trace is alive 112 libtrace_queue_t *queues = c>queues; 113 bool allactive = true; 114 bool live[libtrace_get_perpkt_count(trace)]; // Set if a trace is alive 36 115 uint64_t key[libtrace_get_perpkt_count(trace)]; // Cached keys 37 116 uint64_t min_key = UINT64_MAX; 38 117 uint64_t prev_min = 0; 118 uint64_t peeked = 0; 39 119 int min_queue = 1; 40 120 41 121 /* Loop through check all are alive (have data) and find the smallest */ 42 122 for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) { 43 123 libtrace_queue_t *v = &queues[i]; 44 124 if (libtrace_deque_get_size(v) != 0) { 45 libtrace_result_t r; 46 libtrace_deque_peek_front(v, (void *) &r); 47 live_count++; 48 live[i] = true; 49 key[i] = r.key; 50 if (i==0  min_key > key[i]) { 51 min_key = key[i]; 52 min_queue = i; 53 } 54 } else { 55 live[i] = false; 56 } 125 if (peek_queue(trace, c, v, &peeked)) { 126 live_count ++; 127 live[i] = true; 128 key[i] = peeked; 129 if (i == 0  min_key > peeked) { 130 min_key = peeked; 131 min_queue = i; 132 } 133 } else { 134 live[i] = false; 135 key[i] = 0; 136 } 137 } else { 138 allactive = false; 139 live[i] = false; 140 key[i] = 0; 141 } 57 142 } 58 143 … … 60 145 * joined we always flush what's left. Or the next smallest is the same 61 146 * value or less than the previous */ 62 while (( live_count == libtrace_get_perpkt_count(trace))  (live_count && final)147 while ((allactive && min_queue != 1)  (live_count && final) 63 148  (live_count && prev_min >= min_key)) { 64 149 /* Get the minimum queue and then do stuff */ … … 70 155 71 156 // Now update the one we just removed 72 if (libtrace_deque_get_size(&queues[min_queue]) ) 73 { 74 libtrace_deque_peek_front(&queues[min_queue], (void *) &r); 75 key[min_queue] = r.key; 76 if (key[min_queue] <= min_key) { 77 // We are still the smallest, might be out of order though :( 78 min_key = key[min_queue]; 79 } else { 80 min_key = key[min_queue]; // Update our minimum 81 // Check all find the smallest again  all are alive 82 for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) { 83 if (live[i] && min_key > key[i]) { 84 min_key = key[i]; 85 min_queue = i; 86 } 87 } 88 } 157 peeked = next_message(&queues[min_queue]); 158 if (libtrace_deque_get_size(&queues[min_queue]) && 159 peeked != 0) { 160 161 key[min_queue] = peeked; 162 // We are still the smallest, might be out of order :( 163 if (key[min_queue] <= min_key) { 164 min_key = key[min_queue]; 165 } else { 166 min_key = key[min_queue]; // Update our minimum 167 // Check all find the smallest again  all are alive 168 for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) { 169 if (live[i] && min_key >= key[i]) { 170 min_key = key[i]; 171 min_queue = i; 172 } 173 } 174 } 89 175 } else { 90 live[min_queue] = false; 176 allactive = false; 177 live[min_queue] = false; 91 178 live_count; 92 179 prev_min = min_key; … … 104 191 105 192 static void read(libtrace_t *trace, libtrace_combine_t *c) { 106 read_internal(trace, c >queues, false);193 read_internal(trace, c, false); 107 194 } 108 195 109 196 static void read_final(libtrace_t *trace, libtrace_combine_t *c) { 110 read_internal(trace, c>queues, true); 197 int empty = 0, i; 198 libtrace_queue_t *q = c>queues; 199 200 do { 201 read_internal(trace, c, true); 202 empty = 0; 203 for (i = 0; i < libtrace_get_perpkt_count(trace); ++i) { 204 if (libtrace_deque_get_size(&q[i]) == 0) 205 empty ++; 206 } 207 } 208 while (empty < libtrace_get_perpkt_count(trace)); 111 209 } 112 210 … … 139 237 pause, /* pause */ 140 238 NULL, /* queues */ 239 0, /* last_count_tick */ 240 0, /* last_ts_tick */ 141 241 {0} /* opts */ 142 242 };
Note: See TracChangeset
for help on using the changeset viewer.