Changeset 32751a2


Ignore:
Timestamp:
09/04/15 15:23:24 (5 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
0f6bc3f
Parents:
bf986a1
Message:

Make RT safe for use as a parallel input

Previously, the RT format used to assume that only one read packet would
be in use at any given time, so subsequent read_packet calls could overwrite
the buffer space that previous packets had occupied. With parallel libtrace,
this is no longer the case.

For now, I've gone with the safe but slow solution of copying every
complete RT packet out of the receive buffer into memory space that has
been allocated to the libtrace_packet_t. It'd be nice to do this without
the copy, but we'd need to start tracking which packets have been returned
to us to be able to do this properly.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_rt.c

    r7e6b54d r32751a2  
    338338
    339339/* Receives data from an RT server */
    340 static int rt_read(libtrace_t *libtrace, void **buffer, size_t len, int block)
     340static int rt_read(libtrace_t *libtrace, void *buffer, size_t len, int block)
    341341{
    342342        int numbytes;
     
    407407
    408408        }
    409         *buffer = RT_INFO->buf_current;
     409        memcpy(buffer, RT_INFO->buf_current, len);
    410410        RT_INFO->buf_current += len;
    411411        RT_INFO->buf_filled -= len;
     
    585585        uint32_t prep_flags = 0;
    586586
    587         prep_flags |= TRACE_PREP_DO_NOT_OWN_BUFFER;
     587        prep_flags |= TRACE_PREP_OWN_BUFFER;
    588588
    589589        /* The stored RT header will tell us how much data we need to read */
    590         if (rt_read(libtrace, &packet->buffer, (size_t)RT_INFO->rt_hdr.length,
     590        if (rt_read(libtrace, packet->buffer, (size_t)RT_INFO->rt_hdr.length,
    591591                                blocking) != RT_INFO->rt_hdr.length) {
    592592                return -1;
     
    623623static int rt_read_packet_versatile(libtrace_t *libtrace,
    624624                libtrace_packet_t *packet,int blocking) {
     625        rt_header_t hdr;
    625626        rt_header_t *pkt_hdr = NULL;
    626627        void *void_hdr;
    627628        libtrace_rt_types_t switch_type;
    628629       
    629         if (packet->buf_control == TRACE_CTRL_PACKET) {
    630                 packet->buf_control = TRACE_CTRL_EXTERNAL;
    631                 free(packet->buffer);
    632                 packet->buffer = NULL;
    633         }
    634 
    635630        /* RT_LAST indicates that we need to read the RT header for the next
    636631         * packet. This is a touch hax, I admit */
    637632        if (RT_INFO->rt_hdr.type == TRACE_RT_LAST) {
    638                 void_hdr = (void *)pkt_hdr;
    639633                /* FIXME: Better error handling required */
    640                 if (rt_read(libtrace, &void_hdr,
     634                if (rt_read(libtrace, (void *)&hdr,
    641635                                sizeof(rt_header_t),blocking) !=
    642636                                sizeof(rt_header_t)) {
    643637                        return -1;
    644638                }
    645                 pkt_hdr = (rt_header_t *)void_hdr;
    646                
    647639                /* Need to store these in case the next rt_read overwrites
    648640                 * the buffer they came from! */
    649                 RT_INFO->rt_hdr.type = ntohl(pkt_hdr->type);
    650                 RT_INFO->rt_hdr.length = ntohs(pkt_hdr->length);
    651                 RT_INFO->rt_hdr.sequence = ntohl(pkt_hdr->sequence);
    652         }
    653         packet->type = RT_INFO->rt_hdr.type;
     641                RT_INFO->rt_hdr.type = ntohl(hdr.type);
     642                RT_INFO->rt_hdr.length = ntohs(hdr.length);
     643                RT_INFO->rt_hdr.sequence = ntohl(hdr.sequence);
     644        }
     645
     646        if (packet->buf_control == TRACE_CTRL_PACKET) {
     647                if (packet->buffer == NULL) {
     648                        packet->buffer = malloc(RT_INFO->rt_hdr.length + sizeof(rt_header_t));
     649
     650                } else if (RT_INFO->rt_hdr.length > sizeof(packet->buffer)) {
     651                        packet->buffer = realloc(packet->buffer, RT_INFO->rt_hdr.length + sizeof(rt_header_t));
     652                }
     653        }
     654       
     655        packet->type = RT_INFO->rt_hdr.type;
     656        packet->payload = packet->buffer;
     657       
    654658       
    655659        /* All data-bearing packets (as opposed to RT internal messages)
     
    668672                case TRACE_RT_STATUS:
    669673                case TRACE_RT_METADATA:
    670                         if (rt_read_data_packet(libtrace, packet, blocking))
     674                        if (rt_read_data_packet(libtrace, packet, blocking))
    671675                                return -1;
    672676                        break;
     
    677681                case TRACE_RT_SERVERSTART:
    678682                        /* All these have no payload */
     683                        packet->header = packet->buffer;
     684                        packet->payload = ((char *)packet->buffer + sizeof(rt_header_t));
     685                        pkt_hdr = (rt_header_t *)packet->header;
     686                        pkt_hdr->type = ntohl(RT_INFO->rt_hdr.type);
     687                        pkt_hdr->length = ntohs(RT_INFO->rt_hdr.length);
     688                        pkt_hdr->sequence = ntohl(RT_INFO->rt_hdr.sequence);
     689
     690                        /* XXX Do we need to save the other crap? */
    679691                        break;
    680692                case TRACE_RT_PAUSE_ACK:
     
    689701                        return -1;
    690702        }
    691                                
    692                        
    693                
    694         /* Return the number of bytes read from the stream */
     703       
     704        /* Return the number of bytes read from the stream */
    695705        RT_INFO->rt_hdr.type = TRACE_RT_LAST;
    696706        return RT_INFO->rt_hdr.length + sizeof(rt_header_t);
Note: See TracChangeset for help on using the changeset viewer.