Changeset 5865c72


Ignore:
Timestamp:
03/25/09 17:02:51 (13 years ago)
Author:
Daniel Lawson <dlawson@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, getfragoff, help, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
5745e78
Parents:
37f537f
Message:

Added dag_dump_packet(), to abstract the mechanics of actually writing to the dag card. This function wraps up a performance optimisation - when we request tx space, we request a large amount and memcpy into this, then once this reaches a threshold, we commit the entire thing. This could do with some tweaking, and perhaps a freshness check. Also fixed a size/alignment issue which was causing tx problems.

dag_write_packet() now attempts to convert from other packet types into ERF format, as per erf_write_packet() in format_erf.c, however THIS HAS NOT BEEN TESTED. Use at your own risk, and fix it yourself when it breaks :)

dag_fin_output() now includes a call to dag_tx_stream_commit_bytes() to flush the txbuffer on program close, and then a call to dag_tx_get_stream_space() for nearly the size of the stream buffer. Note that we don't request the exact size (we request a 64bit word less than the exact size), as this doesn't work. The call to dag_tx_get_stream_space() allows the tx stream buffer on the card to drain before we detach - otherwise we'd lose anything still in the tx buffer.

This means that trace_destroy_output() MUST be called if you want to be sure that you actually finish transmitting!

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_dag25.c

    r35c5a72 r5865c72  
    9191        uint32_t processed;
    9292        uint64_t drops;
     93        uint64_t waiting;
     94        uint8_t *txbuffer;
    9395};
    9496
     
    112114pthread_mutex_t open_dag_mutex;
    113115struct dag_dev_t *open_dags = NULL;
     116
     117/* Dag erf ether packets have a 2 byte padding before the packet
     118 * so that the ip header is aligned on a 32 bit boundary.
     119 */
     120static int dag_get_padding(const libtrace_packet_t *packet)
     121{
     122        if (packet->trace->format->type==TRACE_FORMAT_ERF) {
     123                dag_record_t *erfptr = (dag_record_t *)packet->header;
     124                switch(erfptr->type) {
     125                        case TYPE_ETH:
     126                        case TYPE_DSM_COLOR_ETH:
     127                                return 2;
     128                        default:                return 0;
     129                }
     130        }
     131        else {
     132                switch(trace_get_link_type(packet)) {
     133                        case TRACE_TYPE_ETH:    return 2;
     134                        default:                return 0;
     135                }
     136        }
     137}
    114138
    115139static int dag_probe_filename(const char *filename)
     
    138162        FORMAT_DATA_OUT->bottom = NULL;
    139163        FORMAT_DATA_OUT->top = NULL;
     164        FORMAT_DATA_OUT->waiting = 0;
    140165
    141166}
     
    280305        struct dag_dev_t *dag_device = NULL;
    281306        int stream = 1;
     307        unsigned long wake_time;
     308        dagutil_sleep_get_wake_time(&wake_time,0);
    282309
    283310        dag_init_format_out_data(libtrace);
     
    403430
    404431        /* We don't want the dag card to do any sleeping */
    405         /*
    406         dag_set_stream_poll(FORMAT_DATA->device->fd,
    407                         FORMAT_DATA->dagstream, 0, &zero,
     432
     433        dag_set_stream_poll(FORMAT_DATA_OUT->device->fd,
     434                        FORMAT_DATA_OUT->dagstream, 0, &zero,
    408435                        &nopoll);
    409 */
     436
    410437        return 0;
    411438}
     
    504531
    505532static int dag_fin_output(libtrace_out_t *libtrace) {
     533        // commit any outstanding traffic in the txbuffer
     534        if (FORMAT_DATA_OUT->waiting) {
     535                dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
     536                                FORMAT_DATA_OUT->waiting );
     537        }
     538
     539        // wait until the buffer is nearly clear before exiting the program, as we
     540        // will lose packets otherwise
     541        dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
     542                        FORMAT_DATA_OUT->dagstream,
     543                        dag_get_stream_buffer_size(FORMAT_DATA_OUT->device->fd,
     544                                        FORMAT_DATA_OUT->dagstream) - 8
     545                        );
     546
    506547        pthread_mutex_lock(&open_dag_mutex);
    507548        if (FORMAT_DATA_OUT->stream_attached)
     
    627668}
    628669
     670/*
     671 * dag_write_packet() at this stage attempts to improve tx performance
     672 * by delaying sending a dag_tx_stream_commit_bytes() until a threshold
     673 * has been met. I observed approximately 270% performance increase
     674 * through this relatively naive tweak. No optimisation of buffer sizes
     675 * was attempted.
     676 */
     677
     678static int dag_dump_packet(libtrace_out_t *libtrace,
     679                dag_record_t *erfptr, unsigned int pad, void *buffer) {
     680        //int numbytes = 0;
     681        int size;
     682
     683        /*
     684         * If we've got 0 bytes waiting in the txqueue, assume that we haven't
     685         * requested any space yet, and request some, storing the pointer at
     686         * FORMAT_DATA_OUT->txbuffer.
     687         *
     688         * The amount to request is slightly magical at the moment - it's
     689         * 16Mebibytes + 128 kibibytes to ensure that we can copy a packet into
     690         * the buffer and handle overruns.
     691         */
     692        if (FORMAT_DATA_OUT->waiting == 0) {
     693                FORMAT_DATA_OUT->txbuffer = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd,
     694                                FORMAT_DATA_OUT->dagstream, 16908288);
     695        }
     696
     697        /*
     698         * Copy the header separately to the body, as we can't guarantee they are
     699         * in contiguous memory
     700         */
     701        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,erfptr,(dag_record_size + pad));
     702        FORMAT_DATA_OUT->waiting += (dag_record_size + pad);
     703
     704
     705
     706        /*
     707         * Copy our incoming packet into the outgoing buffer, and increment our waiting count
     708         */
     709        size = ntohs(erfptr->rlen)-(dag_record_size + pad);
     710        memcpy(FORMAT_DATA_OUT->txbuffer + FORMAT_DATA_OUT->waiting,buffer,size);
     711        FORMAT_DATA_OUT->waiting += size;
     712
     713        /*
     714         * if our output buffer has more than 16 Mebibytes in it, commit those bytes and
     715         * reset the waiting count to 0.
     716         * Note: dag_fin_output will also call dag_tx_stream_commit_bytes() in case
     717         * there is still data in the buffer at program exit.
     718         */
     719
     720        if (FORMAT_DATA_OUT->waiting >= 16*1024*1024) {
     721                FORMAT_DATA_OUT->txbuffer = dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
     722                        FORMAT_DATA_OUT->waiting );
     723                FORMAT_DATA_OUT->waiting = 0;
     724        }
     725
     726        return size + pad + dag_record_size;
     727
     728}
     729
     730static bool find_compatible_linktype(libtrace_out_t *libtrace,
     731                                libtrace_packet_t *packet)
     732{
     733         // Keep trying to simplify the packet until we can find
     734         //something we can do with it
     735
     736        do {
     737                char type=libtrace_to_erf_type(trace_get_link_type(packet));
     738
     739                // Success
     740                if (type != (char)-1)
     741                        return true;
     742
     743                if (!demote_packet(packet)) {
     744                        trace_set_err_out(libtrace,
     745                                        TRACE_ERR_NO_CONVERSION,
     746                                        "No erf type for packet (%i)",
     747                                        trace_get_link_type(packet));
     748                        return false;
     749                }
     750
     751        } while(1);
     752
     753        return true;
     754}
    629755
    630756static int dag_write_packet(libtrace_out_t *libtrace, libtrace_packet_t *packet) {
    631         int err;
    632         void *record;
    633         int size = trace_get_capture_length(packet) + dag_record_size;
    634         size = size + (8 - (size % 8));
    635 
    636 
    637         //dag_record_t *erf = packet->buffer;
    638 /*
    639         record = dag_tx_get_stream_space(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream, 65535);
    640 
    641         memcpy(record,packet->buffer,size);
    642 
    643         dag_tx_stream_commit_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
    644                         size );
    645 */
    646 
    647 
    648 
    649         if (dag_tx_stream_copy_bytes(FORMAT_DATA_OUT->device->fd, FORMAT_DATA_OUT->dagstream,
    650                         packet->buffer, size) == -1 ) {
    651                 trace_set_err_out(libtrace, errno, "dag_tx_stream_copy_bytes failed!");
    652                 return 1;
    653         }
    654 
    655         return 0;
     757        /*
     758         * This is heavily borrowed from erf_write_packet(). Yes, CnP coding sucks,
     759         * sorry about that.
     760         */
     761        unsigned int pad = 0;
     762        int numbytes;
     763        void *payload = packet->payload;
     764        dag_record_t *header = (dag_record_t *)packet->header;
     765
     766        FORMAT_DATA_OUT->processed ++;
     767        if(!packet->header) {
     768                // No header, probably an RT packet. Lifted from erf_write_packet().
     769                return -1;
     770        }
     771
     772        pad = dag_get_padding(packet);
     773
     774        /*
     775         * if the payload is null, adjust the rlen. Discussion of this is
     776         * attached to erf_write_packet()
     777         */
     778        if (payload == NULL) {
     779                header->rlen = htons(dag_record_size + pad);
     780        }
     781
     782        if (packet->type == TRACE_RT_DATA_ERF) {
     783                numbytes = dag_dump_packet(libtrace,
     784                                header,
     785                                pad,
     786                                payload
     787                                );
     788
     789        } else {
     790                /* Build up a new packet header from the existing header */
     791
     792                // Simplify the packet first - if we can't do this, break early
     793                if (!find_compatible_linktype(libtrace,packet))
     794                        return -1;
     795
     796                dag_record_t erfhdr;
     797
     798                erfhdr.ts = bswap_host_to_le64(trace_get_erf_timestamp(packet));
     799                payload=packet->payload;
     800                pad = dag_get_padding(packet);
     801
     802                /* Flags. Can't do this */
     803                memset(&erfhdr.flags,1,sizeof(erfhdr.flags));
     804                if (trace_get_direction(packet)!=~0U)
     805                        erfhdr.flags.iface = trace_get_direction(packet);
     806
     807                erfhdr.type = libtrace_to_erf_type(trace_get_link_type(packet));
     808
     809                /* Packet length (rlen includes format overhead) */
     810                assert(trace_get_capture_length(packet)>0
     811                                && trace_get_capture_length(packet)<=65536);
     812                assert(erf_get_framing_length(packet)>0
     813                                && trace_get_framing_length(packet)<=65536);
     814                assert(trace_get_capture_length(packet)+erf_get_framing_length(packet)>0
     815                      &&trace_get_capture_length(packet)+erf_get_framing_length(packet)<=65536);
     816
     817                erfhdr.rlen = htons(trace_get_capture_length(packet)
     818                        + erf_get_framing_length(packet));
     819
     820
     821                /* loss counter. Can't do this */
     822                erfhdr.lctr = 0;
     823                /* Wire length, does not include padding! */
     824                erfhdr.wlen = htons(trace_get_wire_length(packet));
     825
     826                /* Write it out */
     827                numbytes = dag_dump_packet(libtrace,
     828                                &erfhdr,
     829                                pad,
     830                                payload);
     831
     832        }
     833
     834        return numbytes;
    656835}
    657836
Note: See TracChangeset for help on using the changeset viewer.