Changeset 5e3f16c for lib


Ignore:
Timestamp:
12/05/16 19:04:41 (4 years ago)
Author:
Richard Sanger <rsanger@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
adb2c4c, cff1819, f8613e4
Parents:
8ccb8536
Message:

Fix for issue #39 - ring and int pstop() fails on older kernels when using threads

The problem here is that on old kernels without PACKET_FANOUT support
(added in v3.1) will only include the single threaded versions of int
and ring. When used with multiple threads the libtrace API will
fallback to using read rather than pread which does not check message
queues.

To fix this issue, in any format without pread support:

  • We check for new messages with each loop around read_packet as we fill the burst
  • Within read_packet we update the halt to include the pausing state
  • Use a seperate lock to the main lock when reading a burst of packets, otherwise trace_ppause has to wait for a burst to read.

This is not 100% perfect as a single packet might still need to be received
before a generic message can be received.
A proper fix in the future would be to move all format internals purely to the
parallel API.

Location:
lib
Files:
9 edited

Legend:

Unmodified
Added
Removed
  • lib/format_bpf.c

    ree6e802 r5e3f16c  
    497497
    498498                /* Timed out -- check if we should halt */
    499                 if (libtrace_halt)
    500                         return 0;
     499                if ((ret=is_halted(libtrace)) != -1)
     500                        return ret;
    501501        }
    502502       
  • lib/format_dag25.c

    ree6e802 r5e3f16c  
    13041304                                return -2;
    13051305
    1306                         if (libtrace_halt)
    1307                                 return 0;
     1306                        if ((numbytes=is_halted(libtrace)) != -1)
     1307                                return numbytes;
    13081308                        /* Block until we see a packet */
    13091309                        continue;
  • lib/format_dpdk.c

    rc94f107 r5e3f16c  
    21072107                if (mesg && libtrace_message_queue_count(mesg) > 0)
    21082108                        return READ_MESSAGE;
    2109                 if (libtrace_halt)
    2110                         return READ_EOF;
     2109                if ((nb_rx=is_halted(libtrace)) != -1)
     2110                        return nb_rx;
    21112111                /* Wait a while, polling on memory degrades performance
    21122112                 * This relieves the pressure on memory allowing the NIC to DMA */
  • lib/format_linux_int.c

    ree6e802 r5e3f16c  
    215215                                return -1;
    216216                        } else {
    217                                 if (libtrace_halt)
    218                                         return READ_EOF;
     217                                if ((ret=is_halted(libtrace)) != -1)
     218                                        return ret;
    219219                        }
    220220                }
  • lib/format_linux_ring.c

    ree6e802 r5e3f16c  
    510510                } else {
    511511                        /* Poll timed out - check if we should exit */
    512                         if (libtrace_halt)
    513                                 return 0;
     512                        if ((ret=is_halted(libtrace)) != -1)
     513                                return ret;
    514514                        continue;
    515515                }
  • lib/format_pcap.c

    r631fdbc r5e3f16c  
    447447                        case 1: break; /* no error */
    448448                        case 0:
    449                                 if (libtrace_halt)
    450                                         return 0;
     449                                if ((ret=is_halted(libtrace)) != -1)
     450                                        return ret;
    451451                                continue; /* timeout expired */
    452452                        case -1:
  • lib/libtrace_int.h

    ree6e802 r5e3f16c  
    327327        /** Synchronise writes/reads across this format object and attached threads etc */
    328328        pthread_mutex_t libtrace_lock;
     329        /** Packet read lock, seperate from libtrace_lock as to not block while reading a burst */
     330        pthread_mutex_t read_packet_lock;
    329331        /** State */
    330332        enum trace_state state;
     
    606608         * @return The size of the packet read (in bytes) including the capture
    607609         * framing header, or -1 if an error occurs. 0 is returned in the
    608          * event of an EOF.
     610         * event of an EOF or -2 in the case of interrupting the parallel API.
    609611         *
    610612         * If no packets are available for reading, this function should block
     
    10321034 */
    10331035extern volatile int libtrace_halt;
     1036
     1037/**
     1038 * Used by a format to check if trace_interrupt or if a trace_pause/stop has
     1039 * been called. Provides backwards compatibility with traditional read
     1040 * functions when trace_read_packet() is used by the parallel API.
     1041 *
     1042 * Returns -1 if not halting otherwise returns the code that the read
     1043 * operation should pass on.
     1044 */
     1045static inline int is_halted(libtrace_t *trace) {
     1046        if (!(libtrace_halt || trace->state == STATE_PAUSING)) {
     1047                return -1;
     1048        } else if (libtrace_halt) {
     1049                return READ_EOF;
     1050        } else {
     1051                return READ_MESSAGE;
     1052        }
     1053}
    10341054
    10351055/** Registers a new capture format module.
  • lib/trace.c

    r89e2ff7 r5e3f16c  
    261261        /* Parallel inits */
    262262        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
     263        ASSERT_RET(pthread_mutex_init(&libtrace->read_packet_lock, NULL), == 0);
    263264        ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
    264265        libtrace->state = STATE_NEW;
     
    385386        /* Parallel inits */
    386387        ASSERT_RET(pthread_mutex_init(&libtrace->libtrace_lock, NULL), == 0);
     388        ASSERT_RET(pthread_mutex_init(&libtrace->read_packet_lock, NULL), == 0);
    387389        ASSERT_RET(pthread_cond_init(&libtrace->perpkt_cond, NULL), == 0);
    388390        libtrace->state = STATE_NEW; // TODO MAYBE DEAD
     
    680682
    681683        ASSERT_RET(pthread_mutex_destroy(&libtrace->libtrace_lock), == 0);
     684        ASSERT_RET(pthread_mutex_destroy(&libtrace->read_packet_lock), == 0);
    682685        ASSERT_RET(pthread_cond_destroy(&libtrace->perpkt_cond), == 0);
    683686
     
    758761
    759762        ASSERT_RET(pthread_mutex_destroy(&libtrace->libtrace_lock), == 0);
     763        ASSERT_RET(pthread_mutex_destroy(&libtrace->read_packet_lock), == 0);
    760764        ASSERT_RET(pthread_cond_destroy(&libtrace->perpkt_cond), == 0);
    761765
     
    921925                        size_t ret;
    922926                        int filtret;
    923                         if (libtrace_halt)
    924                                 return 0;
     927                        if ((ret=is_halted(libtrace)) != (size_t)-1)
     928                                return ret;
    925929                        /* Store the trace we are reading from into the packet opaque
    926930                         * structure */
    927931                        packet->trace = libtrace;
    928932                        ret=libtrace->format->read_packet(libtrace,packet);
    929                         if (ret==(size_t)-1 || ret==0) {
     933                        if (ret==(size_t)READ_MESSAGE ||
     934                            ret==(size_t)-1 || ret==0) {
    930935                                packet->trace = NULL;
    931936                                return ret;
  • lib/trace_parallel.c

    r89e2ff7 r5e3f16c  
    816816
    817817                if ((packet->error = trace_read_packet(trace, packet)) <1) {
    818                         break; /* We are EOF or error'd either way we stop  */
     818                        if (packet->error == READ_MESSAGE) {
     819                                pkt_skipped = 1;
     820                                continue;
     821                        } else {
     822                                break; /* We are EOF or error'd either way we stop  */
     823                        }
    819824                }
    820825
     
    884889        //bool tick_hit = false;
    885890
    886         ASSERT_RET(pthread_mutex_lock(&libtrace->libtrace_lock), == 0);
     891        ASSERT_RET(pthread_mutex_lock(&libtrace->read_packet_lock), == 0);
    887892        /* Read nb_packets */
    888893        for (i = 0; i < nb_packets; ++i) {
     894                if (libtrace_message_queue_count(&t->messages) > 0) {
     895                        if ( i==0 ) {
     896                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
     897                                return READ_MESSAGE;
     898                        } else {
     899                                break;
     900                        }
     901                }
    889902                packets[i]->error = trace_read_packet(libtrace, packets[i]);
    890903
     
    892905                        /* We'll catch this next time if we have already got packets */
    893906                        if ( i==0 ) {
    894                                 ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     907                                ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
    895908                                return packets[i]->error;
    896909                        } else {
     
    908921                store_first_packet(libtrace, packets[0], t);
    909922        }
    910         ASSERT_RET(pthread_mutex_unlock(&libtrace->libtrace_lock), == 0);
     923        ASSERT_RET(pthread_mutex_unlock(&libtrace->read_packet_lock), == 0);
    911924        /* XXX TODO this needs to be inband with packets, or we don't bother in this case
    912925        if (tick_hit) {
Note: See TracChangeset for help on using the changeset viewer.