Ignore:
Timestamp:
11/29/18 10:12:59 (2 years ago)
Author:
Jacob Van Walraven <jcv9@…>
Branches:
develop
Children:
fdf23b8
Parents:
d74ca03
Message:

Apply changes required for pull request #81

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/trace_parallel.c

    rb6ff245 r2193905  
    271271DLLEXPORT bool trace_has_reporter(libtrace_t * libtrace)
    272272{
    273         /*assert(libtrace->state != STATE_NEW);*/
    274273        if (!(libtrace->state != STATE_NEW)) {
    275274                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "Cannot check reporter for the current state in trace_has_reporter()");
     
    372371static inline bool trace_supports_parallel(libtrace_t *trace)
    373372{
    374         /*assert(trace);*/
    375373        if (!trace) {
    376374                fprintf(stderr, "NULL trace passed into trace_supports_parallel()\n");
    377375                return false;
    378376        }
    379         /*assert(trace->format);*/
    380377        if (!trace->format) {
    381378                trace_set_err(trace, TRACE_ERR_BAD_FORMAT,
     
    413410                        return &libtrace->perpkt_threads[i];
    414411        }
    415         return NULL;
     412        pthread_exit(NULL);
    416413}
    417414
     
    503500                trace_fin_packet(*packet);
    504501        } else {
    505                 /*assert((*packet)->error == READ_TICK);*/
    506                 if (!((*packet)->error == READ_TICK)) {
     502                if ((*packet)->error != READ_TICK) {
    507503                        trace_set_err(trace, TRACE_ERR_BAD_STATE,
    508                                 "Packet is not in READ_TICK state in dispath_packet()");
     504                                "dispatch_packet() called with invalid 'packet'");
    509505                        return -1;
    510506                }
     
    551547                } else {
    552548                        /* Break early */
    553                         /*assert(ret == READ_MESSAGE);*/
    554                         if (!(ret == READ_MESSAGE)) {
     549                        if (ret != READ_MESSAGE) {
    555550                                trace_set_err(trace, TRACE_ERR_UNKNOWN_OPTION,
    556                                         "Expected READ_MESSAGE in dispatch_packets()");
     551                                        "dispatch_packets() called with at least one invalid packet");
    557552                                return -1;
    558553                        }
     
    606601                        } else if (ret != READ_MESSAGE) {
    607602                                /* Ignore messages we pick these up next loop */
    608                                 /*assert (ret == READ_EOF || ret == READ_ERROR);*/
    609603                                if (!(ret == READ_EOF || ret == READ_ERROR)) {
    610604                                        trace_set_err(trace, TRACE_ERR_PAUSE_PTHREAD,
     
    617611                                        ASSERT_RET(trace->pread(trace, t, &packet, 1), <= 0);
    618612                                        // No packets after this should have any data in them
    619                                         /*assert(packet->error <= 0);*/
    620                                         if (!(packet->error <= 0)) {
    621                                                 trace_set_err(trace, TRACE_ERR_BAD_PACKET,
    622                                                         "Expected no data from packets however found some in trace_perpkt_thread_pause()");
     613                                        if (packet->error > 0) {
     614                                                trace_set_err(trace, TRACE_ERR_BAD_PACKET, "Bogus data in "
     615                                                        "libtrace ring buffer after pausing perpkt thread");
    623616                                                return -1;
    624617                                        }
     
    658651        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    659652        t = get_thread_table(trace);
    660         /*assert(t);*/
    661653        if (!t) {
    662654                trace_set_err(trace, TRACE_ERR_THREAD, "Unable to get thread table in perpkt_threads_entry()");
    663                 return NULL;
     655                pthread_exit(NULL);
    664656        }
    665657        if (trace->state == STATE_ERROR) {
     
    703695                                                goto error;
    704696                                        }
    705                                         /*assert(ret == 1);*/
    706                                         if (!(ret == 1)) {
     697                                        if (ret != 1) {
    707698                                                fprintf(stderr, "Unknown error pausing thread in perpkt_threads_entry()\n");
    708                                                 return NULL;
     699                                                pthread_exit(NULL);
    709700                                        }
    710701                                        continue;
     
    730721                        }
    731722                        if (!trace->pread) {
    732                                 /*assert(packets[0]);*/
    733723                                if (!packets[0]) {
    734                                         fprintf(stderr, "Packet missing in perpkt_threads_entry()\n");
    735                                         return NULL;
     724                                        fprintf(stderr, "Unable to read into NULL packet structure\n");
     725                                        pthread_exit(NULL);
    736726                                }
    737727                                nb_packets = trace_read_packet(trace, packets[0]);
     
    826816        int pkt_skipped = 0;
    827817
    828         /*assert(trace_has_dedicated_hasher(trace));*/
    829818        if (!trace_has_dedicated_hasher(trace)) {
    830819                fprintf(stderr, "Trace does not have hasher associated with it in hasher_entry()\n");
    831                 return NULL;
     820                pthread_exit(NULL);
    832821        }
    833822        /* Wait until all threads are started and objects are initialised (ring buffers) */
    834823        ASSERT_RET(pthread_mutex_lock(&trace->libtrace_lock), == 0);
    835824        t = &trace->hasher_thread;
    836         /*assert(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid));*/
    837825        if (!(t->type == THREAD_HASHER && pthread_equal(pthread_self(), t->tid))) {
    838826                fprintf(stderr, "Incorrect thread type or non matching thread IDs in hasher_entry()\n");
    839                 return NULL;
     827                pthread_exit(NULL);
    840828        }
    841829
     
    857845                if (!pkt_skipped)
    858846                        libtrace_ocache_alloc(&trace->packet_freelist, (void **) &packet, 1, 1);
    859                 /*assert(packet);*/
    860847                if (!packet) {
    861                         fprintf(stderr, "Cannot hash and queue a NULL packet in hasher_entry()\n");
    862                         return NULL;
     848                        fprintf(stderr, "Hasher thread was unable to get a fresh packet from the "
     849                                "object cache\n");
     850                        pthread_exit(NULL);
    863851                }
    864852
     
    879867                                case MESSAGE_DO_STOP:
    880868                                        /* Either FINISHED or FINISHING */
    881                                         /*assert(trace->started == false);*/
    882869                                        if (!(trace->started == false)) {
    883                                                 fprintf(stderr, "Expected trace to have not started in hasher_entry()\n");
    884                                                 return NULL;
     870                                                fprintf(stderr, "STOP message received by hasher, but "
     871                                                        "trace is still active\n");
     872                                                pthread_exit(NULL);
    885873                                        }
    886874                                        /* Mark the current packet as EOF */
     
    922910                        }
    923911                        pkt_skipped = 0;
    924                 } else {
    925                         /*assert(!"Dropping a packet!!");*/
    926                         fprintf(stderr, "Expected THREAD_FINISHED state in hasher_entry()\n");
    927                         return NULL;
    928                         pkt_skipped = 1; // Reuse that packet no one read it
    929912                }
    930913        }
     
    10541037                           sent once*/
    10551038                        if (packets[i]->error != READ_MESSAGE) {
    1056                                 /*assert(t->format_data == NULL);*/
    1057                                 if (!(t->format_data == NULL)) {
    1058                                         trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
    1059                                                 "Expected format data to be NULL in trace_pread_packet_hasher_thread()");
    1060                                         return -1;
    1061                                 }
    10621039                                t->format_data = packets[i];
    10631040                        }
     
    12191196                                break;
    12201197                        case MESSAGE_DO_PAUSE:
    1221                                 /*assert(trace->combiner.pause);*/
    1222                                 if(!(trace->combiner.pause)) {
    1223                                         trace_set_err(trace, TRACE_ERR_COMBINER,
    1224                                                 "Expected combiner to be paused in reporter_entry()");
    1225                                         return NULL;
     1198                                if(trace->combiner.pause) {
     1199                                        trace->combiner.pause(trace, &trace->combiner);
    12261200                                }
    1227                                 trace->combiner.pause(trace, &trace->combiner);
    12281201                                send_message(trace, t, MESSAGE_PAUSING,
    12291202                                                (libtrace_generic_t) {0}, t);
     
    12471220        thread_change_state(trace, &trace->reporter_thread, THREAD_FINISHED, true);
    12481221        print_memory_stats();
    1249         return NULL;
     1222        pthread_exit(NULL);
    12501223}
    12511224
     
    12831256                                libtrace_message_t msg;
    12841257                                libtrace_message_queue_get(&t->messages, &msg);
    1285                                 /*assert(msg.code == MESSAGE_DO_STOP);*/
    1286                                 if (!(msg.code == MESSAGE_DO_STOP)) {
     1258                                if (msg.code != MESSAGE_DO_STOP) {
    12871259                                        fprintf(stderr, "Unexpected message code in keepalive_entry()\n");
    1288                                         return NULL;
     1260                                        pthread_exit(NULL);
    12891261                                }
    12901262                                goto done;
     
    13011273
    13021274        thread_change_state(trace, t, THREAD_FINISHED, true);
    1303         return NULL;
     1275        pthread_exit(NULL);
    13041276}
    13051277
     
    13511323                        return READ_MESSAGE;
    13521324                } else {
    1353                         /*assert(!"trace_delay_packet: Unexpected return from select");*/
    13541325                        trace_set_err(libtrace, TRACE_ERR_BAD_PACKET, "Unexpected return from select in delay_tracetime()");
    13551326                        return -1;
     
    14051376                                      size_t nb_packets) {
    14061377        int i;
    1407         /*assert(libtrace && "libtrace is NULL in trace_read_packet()");*/
    14081378        if (!libtrace) {
    14091379                fprintf(stderr, "NULL trace passed into trace_read_packet()\n");
    14101380                return TRACE_ERR_NULL_TRACE;
    14111381        }
    1412         /*assert(nb_packets);*/
    1413         if (!nb_packets) {
     1382        if (nb_packets <= 0) {
    14141383                trace_set_err(libtrace, TRACE_ERR_NULL,
    1415                         "NULL nb_packets passed into trace_pread_packet_wrapper()");
     1384                        "nb_packets must be greater than zero in trace_pread_packet_wrapper()");
    14161385                return -1;
    14171386        }
     
    14271396                int ret;
    14281397                for (i = 0; i < (int) nb_packets; ++i) {
    1429                         /*assert(i[packets]); <--- is this ment to be packets[i]?? */
    14301398                        if (!i[packets]) {
    1431                                 trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "NULL packets in trace_read_packet()");
     1399                                trace_set_err(libtrace, TRACE_ERR_BAD_STATE, "NULL packets in "
     1400                                        "trace_pread_packet_wrapper()");
    14321401                                return -1;
    14331402                        }
     
    14901459        }
    14911460
    1492         /*assert(libtrace_parallel);*/
    14931461        if (!libtrace_parallel) {
    1494                 trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected parallel trace in trace_prestart()");
     1462                trace_set_err(libtrace, TRACE_ERR_THREAD, "Trace_prestart() has been called on a "
     1463                        "non-parallel libtrace input?");
    14951464                return -1;
    14961465        }
    1497         /*assert(!libtrace->perpkt_thread_states[THREAD_RUNNING]);*/
    1498         if (!(!libtrace->perpkt_thread_states[THREAD_RUNNING])) {
    1499                 trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected no running threads in trace_prestart()");
     1466        if (libtrace->perpkt_thread_states[THREAD_RUNNING]) {
     1467                trace_set_err(libtrace, TRACE_ERR_THREAD, "Cannot restart a parallel libtrace input "
     1468                        "while it is still running");
    15001469                return -1;
    15011470        }
     
    15041473        pthread_spin_lock(&libtrace->first_packets.lock);
    15051474        for (i = 0; i < libtrace->perpkt_thread_count; ++i) {
    1506                 /*assert(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet);*/
    1507                 if (!(!!libtrace->perpkt_threads[i].recorded_first == !!libtrace->first_packets.packets[i].packet)) {
    1508                         trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected different first packet in trace_pstart()");
    1509                         return -1;
    1510                 }
    15111475                if (libtrace->first_packets.packets[i].packet) {
    15121476                        trace_destroy_packet(libtrace->first_packets.packets[i].packet);
     
    15181482                }
    15191483        }
    1520         /*assert(libtrace->first_packets.count == 0);*/
    1521         if (!(libtrace->first_packets.count == 0)) {
     1484        if (libtrace->first_packets.count != 0) {
    15221485                trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected a first packets count of 0 in trace_pstart()");
    15231486                return -1;
     
    16611624#endif
    16621625        int ret;
    1663         /*assert(t->type == THREAD_EMPTY);*/
    1664         if (!(t->type == THREAD_EMPTY)) {
     1626        if (t->type != THREAD_EMPTY) {
    16651627                trace_set_err(trace, TRACE_ERR_THREAD,
    16661628                        "Expected thread type of THREAD_EMPTY in trace_start_thread()");
     
    16731635        t->state = THREAD_RUNNING;
    16741636
    1675         /*assert(name);*/
    16761637        if (!name) {
    16771638                trace_set_err(trace, TRACE_ERR_THREAD, "NULL thread name in trace_start_thread()");
     
    18001761        char name[24];
    18011762        sigset_t sig_before, sig_block_all;
    1802         /*assert(libtrace);*/
    18031763        if (!libtrace) {
    18041764                fprintf(stderr, "NULL trace passed to trace_pstart()\n");
     
    20261986        ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
    20271987        libtrace_change_state(libtrace, STATE_NEW, false);
    2028         /*assert(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0);*/
    2029         if (!(libtrace->perpkt_thread_states[THREAD_RUNNING] == 0)) {
     1988        if (libtrace->perpkt_thread_states[THREAD_RUNNING] != 0) {
    20301989                trace_set_err(libtrace, TRACE_ERR_THREAD, "Expected 0 running threads in trace_pstart()");
    20311990                return -1;
     
    21222081        libtrace_thread_t *t;
    21232082        int i;
    2124         /*assert(libtrace);*/
    21252083        if (!libtrace) {
    21262084                fprintf(stderr, "NULL trace passed into trace_ppause()\n");
     
    22592217        int i, err;
    22602218        libtrace_message_t message = {0, {.uint64=0}, NULL};
    2261         /*assert(libtrace);*/
    22622219        if (!libtrace) {
    22632220                fprintf(stderr, "NULL trace passed into trace_pstop()\n");
     
    23612318                // the producer (or any other threads) don't block.
    23622319                libtrace_packet_t * packet;
    2363                 /*assert(libtrace->perpkt_threads[i].state == THREAD_FINISHED);*/
    2364                 if (!(libtrace->perpkt_threads[i].state == THREAD_FINISHED)) {
     2320                if (libtrace->perpkt_threads[i].state != THREAD_FINISHED) {
    23652321                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
    2366                                 "Expected processing threads state THREAD_FINISHED in trace_join()");
     2322                                "Expected processing thread state to be THREAD_FINISHED in trace_join()");
    23672323                        return;
    23682324                }
     
    23752331        if (trace_has_dedicated_hasher(libtrace)) {
    23762332                pthread_join(libtrace->hasher_thread.tid, NULL);
    2377                 /*assert(libtrace->hasher_thread.state == THREAD_FINISHED);*/
    2378                 if (!(libtrace->hasher_thread.state == THREAD_FINISHED)) {
     2333                if (libtrace->hasher_thread.state != THREAD_FINISHED) {
    23792334                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
    2380                                 "Expected hasher thread state THREAD_FINISHED in trace_join()");
     2335                                "Expected hasher thread state to be THREAD_FINISHED in trace_join()");
    23812336                        return;
    23822337                }
     
    23922347                        trace_destroy_packet(packet);
    23932348                if (trace_has_dedicated_hasher(libtrace)) {
    2394                         /*assert(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer));*/
    2395                         if (!(libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer))) {
     2349                        if (!libtrace_ringbuffer_is_empty(&libtrace->perpkt_threads[i].rbuffer)) {
    23962350                                trace_set_err(libtrace, TRACE_ERR_THREAD,
    23972351                                        "Expected processing threads ring buffers to be empty in trace_join()");
     
    24052359        if (trace_has_reporter(libtrace)) {
    24062360                pthread_join(libtrace->reporter_thread.tid, NULL);
    2407                 /*assert(libtrace->reporter_thread.state == THREAD_FINISHED);*/
    2408                 if (!(libtrace->reporter_thread.state == THREAD_FINISHED)) {
     2361                if (libtrace->reporter_thread.state != THREAD_FINISHED) {
    24092362                        trace_set_err(libtrace, TRACE_ERR_THREAD_STATE,
    24102363                                "Expected reporting thread state to be THREAD_FINISHED in trace_join()");
     
    25172470        res.key = key;
    25182471        res.value = value;
    2519         /*assert(libtrace->combiner.publish);*/
    25202472        if (!libtrace->combiner.publish) {
    2521                 fprintf(stderr, "Unable to publish result in trace_publish_result()\n");
     2473                fprintf(stderr, "Combiner has no publish method -- can not publish results!\n");
    25222474                return;
    25232475        }
     
    26732625/* Note update documentation on trace_set_configuration */
    26742626static void config_string(struct user_configuration *uc, char *key, size_t nkey, char *value, size_t nvalue) {
    2675         /*assert(key);*/
    26762627        if (!key) {
    26772628                fprintf(stderr, "NULL key passed to config_string()\n");
    26782629                return;
    26792630        }
    2680         /*assert(value);*/
    26812631        if (!value) {
    26822632                fprintf(stderr, "NULL value passed to config_string()\n");
    26832633                return;
    26842634        }
    2685         /*assert(uc);*/
    26862635        if (!uc) {
    26872636                fprintf(stderr, "NULL uc (user_configuration) passed to config_string()\n");
     
    27342683        char value[100];
    27352684        char *dup;
    2736         /*assert(trace);*/
    27372685        if (!trace) {
    27382686                fprintf(stderr, "NULL trace passed into trace_set_configuration()\n");
    27392687                return TRACE_ERR_NULL_TRACE;
    27402688        }
    2741         /*assert(str);*/
    27422689        if (!str) {
    27432690                trace_set_err(trace, TRACE_ERR_CONFIG, "NULL configuration string passed to trace_set_configuration()");
     
    27542701                        config_string(&trace->config, key, sizeof(key), value, sizeof(value));
    27552702                } else {
    2756                         fprintf(stderr, "Error parsing option %s\n", pch);
     2703                        fprintf(stderr, "Error: parsing option %s\n", pch);
    27572704                }
    27582705                pch = strtok (NULL," ,.-");
     
    27792726
    27802727DLLEXPORT void trace_free_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    2781         /*assert(packet);*/
    27822728        if (!packet) {
    27832729                trace_set_err(libtrace, TRACE_ERR_NULL_PACKET,
     
    28152761                return &libtrace->format->info;
    28162762        else
    2817                 return NULL;
    2818 }
     2763                pthread_exit(NULL);
     2764}
Note: See TracChangeset for help on using the changeset viewer.