Ignore:
Timestamp:
08/19/14 13:59:33 (7 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
f0e8bd6
Parents:
957a72a
Message:

Adds a configuration parser to make it easy to change the parallel configuration.
Adds more configuration options (Tidies some verbose debugging output).
Implements tick packets for the hasher thread case.
Some other minor bug fixes

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    rf051c1b r5b4d121  
    101101
    102102
    103 #define VERBOSE_DEBBUGING 1
    104 
    105 
    106103static size_t trace_pread_packet(libtrace_t *libtrace, libtrace_thread_t *t, libtrace_packet_t *packets[], size_t nb_packets);
    107104
     
    215212        }
    216213
    217 #if VERBOSE_DEBBUGING
    218         fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid,
    219                 prev_state, t->state);
    220 #endif
     214        if (trace->config.debug_state)
     215                fprintf(stderr, "Thread %d state changed from %d to %d\n", (int) t->tid,
     216                        prev_state, t->state);
     217
    221218        if (need_lock)
    222219                pthread_mutex_unlock(&trace->libtrace_lock);
     
    240237        prev_state = trace->state;
    241238        trace->state = new_state;
    242 #if VERBOSE_DEBBUGING
    243         fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
    244                 trace->uridata, get_trace_state_name(prev_state),
    245                 get_trace_state_name(trace->state));
    246 #endif
     239
     240        if (trace->config.debug_state)
     241                fprintf(stderr, "Trace(%s) state changed from %s to %s\n",
     242                        trace->uridata, get_trace_state_name(prev_state),
     243                        get_trace_state_name(trace->state));
     244
    247245        if (need_lock)
    248246                pthread_mutex_unlock(&trace->libtrace_lock);
     
    342340                res->value = (void *)dup;
    343341                trace_destroy_packet(oldpkt);
    344                 fprintf(stderr, "Made a packet safe!!\n");
    345342        }
    346343}
     
    375372
    376373
     374
     375/**
     376 * Dispatches packets to their correct place and applies any translations
     377 * as needed
     378 * @param trace
     379 * @param t
     380 * @param packet (in, out) this will be set to NULL if the user doesn't return the packet for reuse
     381 * @return -1 if an error or EOF has occured and the trace should end otherwise 0 to continue as normal
     382 */
     383static inline int dispatch_packets(libtrace_t *trace, libtrace_thread_t *t, libtrace_packet_t **packets,
     384                                   size_t nb_packets) {
     385        libtrace_message_t message;
     386        size_t i;
     387        for (i = 0; i < nb_packets; ++i) {
     388                if (packets[i]->error > 0) {
     389                        packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t);
     390                } else if (packets[i]->error == READ_TICK) {
     391                        message.code = MESSAGE_TICK;
     392                        message.additional.uint64 = trace_packet_get_order(packets[i]);
     393                        message.sender = t;
     394                        (*trace->per_pkt)(trace, NULL, &message, t);
     395                } else if (packets[i]->error != READ_MESSAGE) {
     396                        // An error this should be the last packet we read
     397                        size_t z;
     398                        // We could have an eof or error and a message such as pause
     399                        for (z = i ; z < nb_packets; ++z) {
     400                                fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error);
     401                                assert (packets[z]->error <= 0);
     402                        }
     403                        return -1;
     404                }
     405                // -2 is a message its not worth checking now just finish this lot and we'll check
     406                // when we loop next
     407        }
     408        return 0;
     409}
     410
    377411/**
    378412 * The is the entry point for our packet processing threads.
     
    417451                                        message.sender = t;
    418452                                        (*trace->per_pkt)(trace, NULL, &message, t);
    419                                         // If a hasher thread is running empty input queues so we don't loose data
     453                                        // If a hasher thread is running empty input queues so we don't lose data
    420454                                        if (trace_has_dedicated_hasher(trace)) {
    421455                                                fprintf(stderr, "Trace is using a hasher thread emptying queues\n");
     
    423457                                                while(!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
    424458                                                        ASSERT_RET(trace_pread_packet(trace, t, packets, 1), == 1);
    425                                                         if (packets[0]->error > 0)
    426                                                                 packets[0] = (*trace->per_pkt)(trace, packets[0], NULL, t);
    427                                                         else if (packets[0]->error != -2) {
     459                                                        if (dispatch_packets(trace, t, packets, 1) == -1) {
    428460                                                                // EOF or error, either way we'll stop
    429461                                                                while (!libtrace_ringbuffer_is_empty(&t->rbuffer)) {
     
    460492                }
    461493                // Loop through the packets we just read
    462                 for (i = 0; i < nb_packets; ++i) {
    463                        
    464                         if (packets[i]->error > 0) {
    465                                 packets[i] = (*trace->per_pkt)(trace, packets[i], NULL, t);
    466                         } else if (packets[i]->error != -2) {
    467                                 // An error this should be the last packet we read
    468                                 size_t z;
    469                                 // We could have an eof or error and a message such as pause
    470                                 for (z = i ; z < nb_packets; ++z) {
    471                                         fprintf(stderr, "i=%d nb_packet=%d err=%d\n", (int) z, (int) nb_packets, packets[z]->error);
    472                                         assert (packets[z]->error < 1);
    473                                 }
    474                                 goto stop;
    475                         }
    476                         // -2 is a message its not worth checking now just finish this lot and we'll check
    477                         // when we loop next
    478                 }
     494                if (dispatch_packets(trace, t, packets, nb_packets) == -1)
     495                        break;
    479496        }
    480497
     
    523540 * core to process it.
    524541 */
    525 static void* hasher_start(void *data) {
     542static void* hasher_entry(void *data) {
    526543        libtrace_t *trace = (libtrace_t *)data;
    527544        libtrace_thread_t * t;
     
    569586                                        assert(trace->started == false);
    570587                                        assert(trace->state == STATE_FINSHED);
     588                                        break;
    571589                                default:
    572590                                        fprintf(stderr, "Hasher thread didn't expect message code=%d\n", message.code);
     
    585603                /* Blocking write to the correct queue - I'm the only writer */
    586604                if (trace->perpkt_threads[thread].state != THREAD_FINISHED) {
     605                        uint64_t order = trace_packet_get_order(packet);
    587606                        libtrace_ringbuffer_write(&trace->perpkt_threads[thread].rbuffer, packet);
     607                        if (trace->config.tick_count && order % trace->config.tick_count == 0) {
     608                                // Write ticks to everyone else
     609                                libtrace_packet_t * pkts[trace->perpkt_thread_count];
     610                                memset(pkts, 0, sizeof(void *) * trace->perpkt_thread_count);
     611                                libtrace_ocache_alloc(&trace->packet_freelist, (void **) pkts, trace->perpkt_thread_count, trace->perpkt_thread_count);
     612                                for (i = 0; i < trace->perpkt_thread_count; i++) {
     613                                        pkts[i]->error = READ_TICK;
     614                                        trace_packet_set_order(pkts[i], order);
     615                                        libtrace_ringbuffer_write(&trace->perpkt_threads[i].rbuffer, pkts[i]);
     616                                }
     617                        }
    588618                        pkt_skipped = 0;
    589619                } else {
     
    705735{
    706736        size_t i = 0;
     737        bool tick_hit = false;
    707738
    708739        nb_packets = fill_array_with_empty_packets(libtrace, packets, nb_packets);
     
    712743        for (i = 0; i < nb_packets; ++i) {
    713744                packets[i]->error = trace_read_packet(libtrace, packets[i]);
    714                 // Doing this inside the lock ensures the first packet is always
    715                 // recorded first
    716745                if (packets[i]->error <= 0) {
    717746                        ++i;
    718747                        break;
    719748                }
    720         }
     749                /*
     750                if (libtrace->config.tick_count && trace_packet_get_order(packets[i]) % libtrace->config.tick_count == 0) {
     751                        tick_hit = true;
     752                }*/
     753        }
     754        // Doing this inside the lock ensures the first packet is always
     755        // recorded first
    721756        if (packets[0]->error > 0) {
    722757                store_first_packet(libtrace, packets[0], t);
    723758        }
    724759        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     760        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
     761        if (tick_hit) {
     762                libtrace_message_t tick;
     763                tick.additional.uint64 = trace_packet_get_order(packets[i]);
     764                tick.code = MESSAGE_TICK;
     765                trace_send_message_to_perpkts(libtrace, &tick);
     766        } */
    725767        return i;
    726768}
     
    739781                libtrace_ocache_free(&libtrace->packet_freelist, (void **) packets, 1, 1);
    740782        packets[0] = libtrace_ringbuffer_read(&t->rbuffer);
    741 
    742 
    743         if (packets[0] == NULL) {
    744                 libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) packets, 1, 1);
    745                 packets[0]->error = -2;
    746         }
    747783
    748784        if (packets[0]->error < 0)
     
    756792                        break;
    757793                }
    758                 // Message wating
    759                 if (packets[i] == NULL) {
    760                         libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &packets[i], 1, 1);
    761                         packets[i]->error = -2;
    762                         ++i;
     794                // These are typically urgent
     795                if (packets[i]->error < 0)
    763796                        break;
    764                 }
    765797        }
    766798       
    767799        return i;
    768         /*if (*packet) {
    769                 return (*packet)->error;
    770         } else {
    771                 // This is how we do a notify, we send a message before hand to note that the trace is over etc.
    772                 // And this will notify the perpkt thread to read that message, this is easiest
    773                 // since cases like pause can also be dealt with this way without actually
    774                 // having to be the end of the stream.
    775                 fprintf(stderr, "Got a NULL packet from hasher therefore message waiting\n");
    776                 return -2;
    777         }*/
    778800}
    779801
     
    11661188
    11671189        while (!trace_finished(trace)) {
    1168 
    1169                 //while ( != LIBTRACE_MQ_FAILED) { }
    1170                 libtrace_message_queue_get(&t->messages, &message);
    1171 
     1190                if (trace->config.reporter_polling) {
     1191                        if (libtrace_message_queue_try_get(&t->messages, &message) == LIBTRACE_MQ_FAILED)
     1192                                message.code = MESSAGE_POST_REPORTER;
     1193                } else {
     1194                        libtrace_message_queue_get(&t->messages, &message);
     1195                }
    11721196                switch (message.code) {
    11731197                        // Check for results
     
    11981222                (*trace->reporter)(trace, &result, NULL);
    11991223        }
     1224        libtrace_vector_destroy(&results);
    12001225
    12011226        // GOODBYE
     
    13201345                        packet->trace = libtrace;
    13211346                        ret=libtrace->format->pread_packet(libtrace, t, packet);
    1322                         if (ret==(size_t)-1 || ret==(size_t)-2 || ret==0) {
     1347                        if (ret <= 0) {
    13231348                                return ret;
    13241349                        }
     
    14991524        if (libtrace->config.packet_thread_cache_size <= 0)
    15001525                libtrace->config.packet_thread_cache_size = 20;
    1501         if (libtrace->config.packet_global_cache_size <= 0)
    1502                 libtrace->config.packet_global_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
    1503 
    1504         if (libtrace->config.packet_global_cache_size <
     1526        if (libtrace->config.packet_cache_size <= 0)
     1527                libtrace->config.packet_cache_size = (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count;
     1528
     1529        if (libtrace->config.packet_cache_size <
    15051530                (libtrace->config.hasher_queue_size + 1) * libtrace->perpkt_thread_count)
    15061531                fprintf(stderr, "WARNING deadlocks may occur and extra memory allocating buffer sizes (packet_freelist_size) mismatched\n");
     
    15231548                t->state = THREAD_RUNNING;
    15241549                libtrace_message_queue_init(&t->messages, sizeof(libtrace_message_t));
    1525                 ASSERT_RET(pthread_create(&t->tid, NULL, hasher_start, (void *) libtrace), == 0);
     1550                ASSERT_RET(pthread_create(&t->tid, NULL, hasher_entry, (void *) libtrace), == 0);
    15261551                snprintf(name, sizeof(name), "hasher-thread");
    15271552                pthread_setname_np(t->tid, name);
     
    15341559                                                 (void (*)(void *))trace_destroy_packet,
    15351560                                                 libtrace->config.packet_thread_cache_size,
    1536                                                  libtrace->config.packet_global_cache_size * 4,
     1561                                                 libtrace->config.packet_cache_size * 4,
    15371562                                                 libtrace->config.fixed_packet_count);
    15381563        // Unused slidingwindow code
     
    15621587                t->perpkt_num = i;
    15631588                if (libtrace->hasher)
    1564                         libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size, LIBTRACE_RINGBUFFER_POLLING);
     1589                        libtrace_ringbuffer_init(&t->rbuffer, libtrace->config.hasher_queue_size,
     1590                                                 libtrace->config.hasher_polling?LIBTRACE_RINGBUFFER_POLLING:0);
    15651591                // Depending on the mode vector or deque might be chosen
    15661592                libtrace_vector_init(&t->vector, sizeof(libtrace_result_t));
     
    16541680        // Special case handle the hasher thread case
    16551681        if (trace_has_dedicated_hasher(libtrace)) {
    1656                 fprintf(stderr, "Hasher thread running we deal with this special!\n");
     1682                if (libtrace->config.debug_state)
     1683                        fprintf(stderr, "Hasher thread is running, asking it to pause ...");
    16571684                libtrace_message_t message = {0};
    16581685                message.code = MESSAGE_DO_PAUSE;
     
    16641691                }
    16651692                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1666         }
    1667 
    1668         fprintf(stderr, "Sending messages \n");
     1693                if (libtrace->config.debug_state)
     1694                        fprintf(stderr, " DONE\n");
     1695        }
     1696
     1697        if (libtrace->config.debug_state)
     1698                fprintf(stderr, "Asking perpkt threads to pause ...");
    16691699        // Stop threads, skip this one if it's a perpkt
    16701700        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
     
    16761706                                // The hasher has stopped and other threads have messages waiting therefore
    16771707                                // If the queues are empty the other threads would have no data
    1678                                 // So send some NULL packets to simply ask the threads to check there message queues
     1708                                // So send some message packets to simply ask the threads to check
    16791709                                // We are the only writer since hasher has paused
    1680                                 libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, NULL);
     1710                                libtrace_packet_t *pkt;
     1711                                libtrace_ocache_alloc(&libtrace->packet_freelist, (void **) &pkt, 1, 1);
     1712                                pkt->error = READ_MESSAGE;
     1713                                libtrace_ringbuffer_write(&libtrace->perpkt_threads[i].rbuffer, pkt);
    16811714                        }
    16821715                } else {
     
    16851718        }
    16861719
     1720        if (t) {
     1721                // A perpkt is doing the pausing, interesting, fake an extra thread paused
     1722                // We rely on the user to *not* return before starting the trace again
     1723                thread_change_state(libtrace, t, THREAD_PAUSED, true);
     1724        }
     1725
     1726        // Wait for all threads to pause
     1727        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     1728        while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
     1729                ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
     1730        }
     1731        ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     1732
     1733        if (libtrace->config.debug_state)
     1734                fprintf(stderr, " DONE\n");
     1735
    16871736        // Deal with the reporter
    16881737        if (trace_has_dedicated_reporter(libtrace)) {
    1689                 fprintf(stderr, "Reporter thread running we deal with this special!\n");
     1738                if (libtrace->config.debug_state)
     1739                        fprintf(stderr, "Reporter thread is running, asking it to pause ...");
    16901740                libtrace_message_t message = {0};
    16911741                message.code = MESSAGE_DO_PAUSE;
     
    16971747                }
    16981748                ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1699         }
    1700 
    1701         // Formats must support native message handling if a message is ready
    1702         // Approach per Perry's suggestion is a non-blocking read
    1703         // followed by a blocking read. XXX STRIP THIS OUT
    1704 
    1705         if (t) {
    1706                 // A perpkt is doing the pausing, interesting, fake an extra thread paused
    1707                 // We rely on the user to *not* return before starting the trace again
    1708                 thread_change_state(libtrace, t, THREAD_PAUSED, true);
    1709         }
    1710 
    1711         fprintf(stderr, "Asking threads to pause\n");
    1712 
    1713         // Wait for all threads to pause
    1714         ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    1715         while(libtrace->perpkt_thread_states[THREAD_RUNNING]) {
    1716                 ASSERT_RET(pthread_cond_wait(&libtrace->perpkt_cond, &libtrace->libtrace_lock), == 0);
    1717         }
    1718         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
    1719 
    1720         fprintf(stderr, "Threads have paused\n");
     1749                if (libtrace->config.debug_state)
     1750                        fprintf(stderr, " DONE\n");
     1751        }
    17211752
    17221753        if (trace_supports_parallel(libtrace) && !trace_has_dedicated_hasher(libtrace)) {
     
    18661897
    18671898        /* Now the hasher */
    1868         // XXX signal it to stop if it hasn't already we should never be in this situation!!
    18691899        if (trace_has_dedicated_hasher(libtrace)) {
    1870                 fprintf(stderr, "Waiting to join with the hasher\n");
    18711900                pthread_join(libtrace->hasher_thread.tid, NULL);
    1872                 fprintf(stderr, "Joined with the hasher\n");
    18731901                assert(libtrace->hasher_thread.state == THREAD_FINISHED);
    18741902        }
     
    18921920
    18931921        if (trace_has_dedicated_reporter(libtrace)) {
    1894                 fprintf(stderr, "Waiting to join with the reporter\n");
    18951922                pthread_join(libtrace->reporter_thread.tid, NULL);
    1896                 fprintf(stderr, "Joined with the reporter\n");
    18971923                assert(libtrace->reporter_thread.state == THREAD_FINISHED);
    18981924        }
     
    19021928                libtrace_message_t msg = {0};
    19031929                msg.code = MESSAGE_DO_STOP;
    1904                 fprintf(stderr, "Waiting to join with the keepalive\n");
    19051930                trace_send_message_to_thread(libtrace, &libtrace->keepalive_thread, &msg);
    19061931                pthread_join(libtrace->keepalive_thread.tid, NULL);
    1907                 fprintf(stderr, "Joined with with the keepalive\n");
    19081932        }
    19091933       
     
    22262250                                libtrace->tracetime = 0;
    22272251                        return 0;
     2252                case TRACE_OPTION_SET_CONFIG:
     2253                        libtrace->config = *((struct user_configuration *) value);
     2254                case TRACE_OPTION_GET_CONFIG:
     2255                        *((struct user_configuration *) value) = libtrace->config;
    22282256        }
    22292257        return 0;
     2258}
     2259
     2260static bool config_bool_parse(char *value, size_t nvalue) {
     2261        if (strncmp(value, "true", nvalue) == 0)
     2262                return true;
     2263        else if (strncmp(value, "false", nvalue) == 0)
     2264                return false;
     2265        else
     2266                return strtoll(value, NULL, 10) != 0;
     2267}
     2268
     2269static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
     2270        assert(key);
     2271        assert(value);
     2272        assert(uc);
     2273        if (strncmp(key, "packet_cache_size", nkey) == 0
     2274                || strncmp(key, "pcs", nkey) == 0) {
     2275                uc->packet_cache_size = strtoll(value, NULL, 10);
     2276        } else if (strncmp(key, "packet_thread_cache_size", nkey) == 0
     2277                           || strncmp(key, "ptcs", nkey) == 0) {
     2278                uc->packet_thread_cache_size = strtoll(value, NULL, 10);
     2279        } else if (strncmp(key, "fixed_packet_count", nkey) == 0
     2280                  || strncmp(key, "fpc", nkey) == 0) {
     2281                uc->fixed_packet_count = config_bool_parse(value, nvalue);
     2282        } else if (strncmp(key, "burst_size", nkey) == 0
     2283                   || strncmp(key, "bs", nkey) == 0) {
     2284                uc->burst_size = strtoll(value, NULL, 10);
     2285        } else if (strncmp(key, "tick_interval", nkey) == 0
     2286                   || strncmp(key, "ti", nkey) == 0) {
     2287                uc->tick_interval = strtoll(value, NULL, 10);
     2288        } else if (strncmp(key, "tick_count", nkey) == 0
     2289                   || strncmp(key, "tc", nkey) == 0) {
     2290                uc->tick_count = strtoll(value, NULL, 10);
     2291        } else if (strncmp(key, "perpkt_threads", nkey) == 0
     2292                   || strncmp(key, "pt", nkey) == 0) {
     2293                uc->perpkt_threads = strtoll(value, NULL, 10);
     2294        } else if (strncmp(key, "hasher_queue_size", nkey) == 0
     2295                   || strncmp(key, "hqs", nkey) == 0) {
     2296                uc->hasher_queue_size = strtoll(value, NULL, 10);
     2297        } else if (strncmp(key, "hasher_polling", nkey) == 0
     2298                   || strncmp(key, "hp", nkey) == 0) {
     2299                uc->hasher_polling = config_bool_parse(value, nvalue);
     2300        } else if (strncmp(key, "reporter_polling", nkey) == 0
     2301                   || strncmp(key, "rp", nkey) == 0) {
     2302                uc->reporter_polling = config_bool_parse(value, nvalue);
     2303        } else if (strncmp(key, "reporter_thold", nkey) == 0
     2304                   || strncmp(key, "rt", nkey) == 0) {
     2305                uc->reporter_thold = strtoll(value, NULL, 10);
     2306        } else if (strncmp(key, "debug_state", nkey) == 0
     2307                   || strncmp(key, "ds", nkey) == 0) {
     2308                uc->debug_state = config_bool_parse(value, nvalue);
     2309        } else {
     2310                fprintf(stderr, "No matching value %s(=%s)\n", key, value);
     2311        }
     2312}
     2313
     2314DLLEXPORT void parse_user_config(struct user_configuration* uc, char * str) {
     2315        char *pch;
     2316        char key[100];
     2317        char value[100];
     2318        assert(str);
     2319        assert(uc);
     2320        printf ("Splitting string \"%s\" into tokens:\n",str);
     2321        pch = strtok (str," ,.-");
     2322        while (pch != NULL)
     2323        {
     2324                if (sscanf(pch, "%99[^=]=%99s", key, value) == 2) {
     2325                        config_string(uc, key, sizeof(key), value, sizeof(value));
     2326                } else {
     2327                        fprintf(stderr, "Error parsing %s\n", pch);
     2328                }
     2329                pch = strtok (NULL," ,.-");
     2330        }
     2331}
     2332
     2333DLLEXPORT void parse_user_config_file(struct user_configuration* uc, FILE *file) {
     2334        char line[1024];
     2335        while (fgets(line, sizeof(line), file) != NULL)
     2336        {
     2337                parse_user_config(uc, line);
     2338        }
    22302339}
    22312340
Note: See TracChangeset for help on using the changeset viewer.