Changeset d391ce0


Ignore:
Timestamp:
09/30/15 13:38:32 (6 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
a151dda
Parents:
e63d80d
Message:

Updated format_rt to use new bucket structure

This should save us from having to memcpy every packet from the read
buffer into the packet buffer.

Added an internalid and srcbucket reference to the libtrace_packet_t
structure, so that trace_fin_packet() can release packets from any buckets
that they are owned by.

Fix race condition on trace->last_packet.

Location:
lib
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • lib/data-struct/buckets.c

    re63d80d rd391ce0  
    164164        }
    165165
    166         while (libtrace_list_get_size(b->nodelist) > 0) {
     166
     167        while (libtrace_list_get_size(b->nodelist) > 1) {
    167168                lnode = libtrace_list_get_index(b->nodelist, 0);
    168169
     
    174175                        break;
    175176
     177                assert(lnode->next != NULL);
    176178                for (i = 0; i < front->slots; i++) {
    177179                        if (front->released[i] == 2) {
  • lib/format_rt.c

    r5478d3d rd391ce0  
    4141#include "format_helper.h"
    4242#include "rt_protocol.h"
     43
     44#include "data-struct/buckets.h"
    4345
    4446#include <sys/stat.h>
     
    8688        char *pkt_buffer;
    8789        /* Pointer to the next packet to be read from the buffer */
    88         char *buf_current;
    89         /* Amount of buffer space used */
    90         size_t buf_filled;
     90        char *buf_read;
     91        /* Pointer to the next unused byte in the buffer */
     92        char *buf_write;
    9193        /* The port to connect to */
    9294        int port;
     
    9597        /* Flag indicating whether the server is doing reliable RT */
    9698        int reliable;
    97 
    98         /* Header for the packet currently being received */
    99         rt_header_t rt_hdr;
    10099
    101100        int unacked;
     
    109108        libtrace_t *dummy_ring;
    110109        libtrace_t *dummy_bpf;
     110
     111        /* Bucket structure for storing read packets until the user is
     112         * done with them. */
     113        libtrace_bucket_t *bucket;
    111114};
    112115
     
    222225        RT_INFO->dummy_bpf = NULL;
    223226        RT_INFO->pkt_buffer = NULL;
    224         RT_INFO->buf_current = NULL;
    225         RT_INFO->buf_filled = 0;
     227        RT_INFO->buf_read = NULL;
     228        RT_INFO->buf_write = NULL;
    226229        RT_INFO->hostname = NULL;
    227230        RT_INFO->port = 0;
    228231        RT_INFO->unacked = 0;
     232
     233        RT_INFO->bucket = libtrace_bucket_init();
    229234}
    230235
     
    280285                return -1;
    281286        }
    282         RT_INFO->rt_hdr.type = TRACE_RT_LAST;
    283287
    284288        return 0;
     
    305309static int rt_fin_input(libtrace_t *libtrace) {
    306310        /* Make sure we clean up any dummy traces that we have been using */
    307        
     311
    308312        if (RT_INFO->dummy_duck)
    309313                trace_destroy_dead(RT_INFO->dummy_duck);
    310314
    311         if (RT_INFO->dummy_erf) 
     315        if (RT_INFO->dummy_erf)
    312316                trace_destroy_dead(RT_INFO->dummy_erf);
    313                
     317
    314318        if (RT_INFO->dummy_pcap)
    315319                trace_destroy_dead(RT_INFO->dummy_pcap);
     
    317321        if (RT_INFO->dummy_linux)
    318322                trace_destroy_dead(RT_INFO->dummy_linux);
    319        
     323
    320324        if (RT_INFO->dummy_ring)
    321325                trace_destroy_dead(RT_INFO->dummy_ring);
     
    323327        if (RT_INFO->dummy_bpf)
    324328                trace_destroy_dead(RT_INFO->dummy_bpf);
     329
     330        if (RT_INFO->bucket)
     331                libtrace_bucket_destroy(RT_INFO->bucket);
    325332        free(libtrace->format_data);
    326333        return 0;
    327334}
    328335
    329 
    330 /* I've upped this to 10K to deal with jumbo-grams that have not been snapped
    331  * in any way. This means we have a much larger memory overhead per packet
    332  * (which won't be used in the vast majority of cases), so we may want to think
    333  * about doing something smarter, e.g. allocate a smaller block of memory and
    334  * only increase it as required.
    335  *
    336  * XXX Capturing off int: can still lead to packets that are larger than 10K,
    337  * in instances where the fragmentation is done magically by the NIC. This
    338  * is pretty nasty, but also very rare.
    339  */
    340 #define RT_BUF_SIZE (LIBTRACE_PACKET_BUFSIZE * 2)
    341 
    342 /* Receives data from an RT server */
    343 static int rt_read(libtrace_t *libtrace, void *buffer, size_t len, int block)
    344 {
    345         int numbytes;
    346        
    347         assert(len <= RT_BUF_SIZE);
    348        
    349         if (!RT_INFO->pkt_buffer) {
    350                 RT_INFO->pkt_buffer = (char*)malloc((size_t)RT_BUF_SIZE);
    351                 RT_INFO->buf_current = RT_INFO->pkt_buffer;
    352                 RT_INFO->buf_filled = 0;
    353         }
    354 
    355 #ifndef MSG_DONTWAIT
    356 #define MSG_DONTWAIT 0
    357 #endif
    358 
    359         if (block)
    360                 block=0;
    361         else
    362                 block=MSG_DONTWAIT;
    363 
    364         /* If we don't have enough buffer space for the amount we want to
    365          * read, move the current buffer contents to the front of the buffer
    366          * to make room */
    367         if (len > RT_INFO->buf_filled) {
    368                 memcpy(RT_INFO->pkt_buffer, RT_INFO->buf_current,
    369                                 RT_INFO->buf_filled);
    370                 RT_INFO->buf_current = RT_INFO->pkt_buffer;
    371 #ifndef MSG_NOSIGNAL
    372 #  define MSG_NOSIGNAL 0
    373 #endif
    374                 /* Loop as long as we don't have all the data that we were
    375                  * asked for */
    376                 while (len > RT_INFO->buf_filled) {
    377                         if ((numbytes = recv(RT_INFO->input_fd,
    378                                                 RT_INFO->buf_current +
    379                                                 RT_INFO->buf_filled,
    380                                                 RT_BUF_SIZE-RT_INFO->buf_filled,
    381                                                 MSG_NOSIGNAL|block)) <= 0) {
    382                                 if (numbytes == 0) {
    383                                         trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
    384                                                         "No data received");
    385                                         return -1;
    386                                 }
    387                                
    388                                 if (errno == EINTR) {
    389                                         /* ignore EINTR in case
    390                                          * a caller is using signals
    391                                          */
    392                                         continue;
    393                                 }
    394                                 if (errno == EAGAIN) {
    395                                         /* We asked for non-blocking mode, so
    396                                          * we need to return now */
    397                                         trace_set_err(libtrace,
    398                                                         EAGAIN,
    399                                                         "EAGAIN");
    400                                         return -1;
    401                                 }
    402                                
    403                                 perror("recv");
    404                                 trace_set_err(libtrace, errno,
    405                                                 "Failed to read data into rt recv buffer");
    406                                 return -1;
    407                         }
    408                         RT_INFO->buf_filled+=numbytes;
     336/* Sends an RT ACK to the server to acknowledge receipt of packets */
     337static int rt_send_ack(libtrace_t *libtrace,
     338                uint32_t seqno)  {
     339       
     340        static char *ack_buffer = 0;
     341        char *buf_ptr;
     342        int numbytes = 0;
     343        size_t to_write = 0;
     344        rt_header_t *hdr;
     345        rt_ack_t *ack_hdr;
     346       
     347        if (!ack_buffer) {
     348                ack_buffer = (char*)malloc(sizeof(rt_header_t)
     349                                                        + sizeof(rt_ack_t));
     350        }
     351       
     352        hdr = (rt_header_t *) ack_buffer;
     353        ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
     354       
     355        hdr->type = htonl(TRACE_RT_ACK);
     356        hdr->length = htons(sizeof(rt_ack_t));
     357
     358        ack_hdr->sequence = htonl(seqno);
     359       
     360        to_write = sizeof(rt_ack_t) + sizeof(rt_header_t);
     361        buf_ptr = ack_buffer;
     362
     363        /* Keep trying until we write the entire ACK */
     364        while (to_write > 0) {
     365                numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0);
     366                if (numbytes == -1) {
     367                        if (errno == EINTR || errno == EAGAIN) {
     368                                continue;
     369                        }
     370                        else {
     371                                printf("Error sending ack\n");
     372                                perror("send");
     373                                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
     374                                                "Error sending ack");
     375                                return -1;
     376                        }
    409377                }
    410 
    411         }
    412         memcpy(buffer, RT_INFO->buf_current, len);
    413         RT_INFO->buf_current += len;
    414         RT_INFO->buf_filled -= len;
    415         return len;
    416 }
    417 
     378                to_write = to_write - numbytes;
     379                buf_ptr = buf_ptr + to_write;
     380               
     381        }
     382
     383        return 1;
     384}
    418385
    419386/* Sets the trace format for the packet to match the format it was originally
     
    506473}               
    507474
    508 /* Sends an RT ACK to the server to acknowledge receipt of packets */
    509 static int rt_send_ack(libtrace_t *libtrace,
    510                 uint32_t seqno)  {
    511        
    512         static char *ack_buffer = 0;
    513         char *buf_ptr;
    514         int numbytes = 0;
    515         size_t to_write = 0;
    516         rt_header_t *hdr;
    517         rt_ack_t *ack_hdr;
    518        
    519         if (!ack_buffer) {
    520                 ack_buffer = (char*)malloc(sizeof(rt_header_t)
    521                                                         + sizeof(rt_ack_t));
    522         }
    523        
    524         hdr = (rt_header_t *) ack_buffer;
    525         ack_hdr = (rt_ack_t *) (ack_buffer + sizeof(rt_header_t));
    526        
    527         hdr->type = htonl(TRACE_RT_ACK);
    528         hdr->length = htons(sizeof(rt_ack_t));
    529 
    530         ack_hdr->sequence = htonl(seqno);
    531        
    532         to_write = sizeof(rt_ack_t) + sizeof(rt_header_t);
    533         buf_ptr = ack_buffer;
    534 
    535         /* Keep trying until we write the entire ACK */
    536         while (to_write > 0) {
    537                 numbytes = send(RT_INFO->input_fd, buf_ptr, to_write, 0);
    538                 if (numbytes == -1) {
    539                         if (errno == EINTR || errno == EAGAIN) {
    540                                 continue;
    541                         }
    542                         else {
    543                                 printf("Error sending ack\n");
    544                                 perror("send");
    545                                 trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
    546                                                 "Error sending ack");
    547                                 return -1;
    548                         }
    549                 }
    550                 to_write = to_write - numbytes;
    551                 buf_ptr = buf_ptr + to_write;
    552                
    553         }
    554 
    555         return 1;
     475
     476/* I've upped this to 10K to deal with jumbo-grams that have not been snapped
     477 * in any way. This means we have a much larger memory overhead per packet
     478 * (which won't be used in the vast majority of cases), so we may want to think
     479 * about doing something smarter, e.g. allocate a smaller block of memory and
     480 * only increase it as required.
     481 *
     482 * XXX Capturing off int: can still lead to packets that are larger than 10K,
     483 * in instances where the fragmentation is done magically by the NIC. This
     484 * is pretty nasty, but also very rare.
     485 */
     486#define RT_BUF_SIZE (LIBTRACE_PACKET_BUFSIZE * 2)
     487
     488static int rt_process_data_packet(libtrace_t *libtrace,
     489                libtrace_packet_t *packet) {
     490
     491        uint32_t prep_flags = TRACE_PREP_DO_NOT_OWN_BUFFER;
     492        rt_header_t *hdr = (rt_header_t *)packet->header;
     493
     494        /* Send an ACK if required */
     495        if (RT_INFO->reliable > 0 && packet->type >= TRACE_RT_DATA_SIMPLE) {
     496                RT_INFO->unacked ++;
     497                if (RT_INFO->unacked >= RT_ACK_FREQUENCY) {
     498                        if (rt_send_ack(libtrace, hdr->sequence) == -1)
     499                                return -1;
     500                        RT_INFO->unacked = 0;
     501                }
     502        }
     503
     504        /* Convert to the original capture format */
     505        if (rt_set_format(libtrace, packet) < 0) {
     506                return -1;
     507        }
     508
     509        /* Update payload pointers and packet type to match the original
     510         * format */
     511        if (trace_prepare_packet(packet->trace, packet, packet->payload,
     512                                packet->type, prep_flags)) {
     513                return -1;
     514        }
     515
     516        return 1;
     517
     518}
     519
     520/* Receives data from an RT server */
     521static int rt_read(libtrace_t *libtrace, int block) {
     522        int numbytes;
     523
     524        if (!RT_INFO->pkt_buffer) {
     525                RT_INFO->pkt_buffer = (char*)malloc((size_t)RT_BUF_SIZE);
     526                RT_INFO->buf_write = RT_INFO->pkt_buffer;
     527                RT_INFO->buf_read = RT_INFO->pkt_buffer;
     528                libtrace_create_new_bucket(RT_INFO->bucket, RT_INFO->pkt_buffer);
     529        }
     530
     531#ifndef MSG_DONTWAIT
     532#define MSG_DONTWAIT 0
     533#endif
     534#ifndef MSG_NOSIGNAL
     535#  define MSG_NOSIGNAL 0
     536#endif
     537
     538        if (block)
     539                block=0;
     540        else
     541                block=MSG_DONTWAIT;
     542
     543        /* If the current buffer has plenty of space left, we can continue to
     544         * read into it, otherwise create a new buffer and move anything in
     545         * the old buffer over to it */
     546        if (RT_INFO->buf_write - RT_INFO->pkt_buffer > RT_BUF_SIZE / 2) {
     547                char *newbuf = (char*)malloc((size_t)RT_BUF_SIZE);
     548
     549                memcpy(newbuf, RT_INFO->buf_read, RT_INFO->buf_write - RT_INFO->buf_read);
     550                RT_INFO->buf_write = newbuf + (RT_INFO->buf_write - RT_INFO->buf_read);
     551                RT_INFO->buf_read = newbuf;
     552                RT_INFO->pkt_buffer = newbuf;
     553                libtrace_create_new_bucket(RT_INFO->bucket, newbuf);
     554
     555        }
     556
     557        /* Attempt to fill the buffer as much as we can */
     558        if ((numbytes = recv(RT_INFO->input_fd, RT_INFO->buf_write,
     559                        RT_BUF_SIZE - (RT_INFO->buf_write-RT_INFO->pkt_buffer),
     560                        MSG_NOSIGNAL | block)) <= 0) {
     561
     562                if (numbytes == 0) {
     563                        trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
     564                                        "No data received by RT client");
     565                        return -1;
     566                }
     567
     568                if (errno == EINTR) {
     569                        /* Ignore EINTR in case a caller is using signals */
     570                        return 0;
     571                }
     572
     573                if (errno == EAGAIN) {
     574                        /* No data available and we are non-blocking */
     575                        trace_set_err(libtrace, EAGAIN, "EAGAIN");
     576                        return -1;
     577                }
     578
     579                trace_set_err(libtrace, TRACE_ERR_RT_FAILURE,
     580                                "Error reading from RT socket: %s",
     581                                strerror(errno));
     582                return -1;
     583        }
     584
     585        RT_INFO->buf_write += numbytes;
     586        return (RT_INFO->buf_write - RT_INFO->buf_read);
     587
     588}
     589
     590
     591static int rt_get_next_packet(libtrace_t *libtrace, libtrace_packet_t *packet,
     592                int block) {
     593
     594        rt_header_t *rthdr;
     595
     596        if (packet->buffer && packet->buf_control == TRACE_CTRL_PACKET)
     597                free(packet->buffer);
     598
     599        while (RT_INFO->buf_write - RT_INFO->buf_read <
     600                                (uint32_t)sizeof(rt_header_t)) {
     601                if (rt_read(libtrace, block) == -1)
     602                        return -1;
     603        }
     604
     605        rthdr = (rt_header_t *)RT_INFO->buf_read;
     606
     607        /* Check if we have enough payload */
     608        while (RT_INFO->buf_write - (RT_INFO->buf_read + sizeof(rt_header_t))
     609                        < ntohs(rthdr->length)) {
     610                if (rt_read(libtrace, block) == -1)
     611                        return -1;
     612                rthdr = (rt_header_t *)RT_INFO->buf_read;
     613        }
     614
     615
     616        packet->buffer = RT_INFO->buf_read;
     617        packet->header = RT_INFO->buf_read;
     618        packet->type = ntohl(((rt_header_t *)packet->header)->type);
     619        packet->payload = RT_INFO->buf_read + sizeof(rt_header_t);
     620        packet->internalid = libtrace_push_into_bucket(RT_INFO->bucket);
     621        assert(packet->internalid != 0);
     622        packet->srcbucket = RT_INFO->bucket;
     623        packet->buf_control = TRACE_CTRL_EXTERNAL;
     624
     625        RT_INFO->buf_read += ntohs(rthdr->length) + sizeof(rt_header_t);
     626
     627        if (packet->type >= TRACE_RT_DATA_SIMPLE) {
     628                rt_process_data_packet(libtrace, packet);
     629        } else {
     630                switch(packet->type) {
     631                        case TRACE_RT_DUCK_2_4:
     632                        case TRACE_RT_DUCK_2_5:
     633                        case TRACE_RT_STATUS:
     634                        case TRACE_RT_METADATA:
     635                                if (rt_process_data_packet(libtrace, packet) < 0)
     636                                        return -1;
     637                                break;
     638                        case TRACE_RT_END_DATA:
     639                        case TRACE_RT_KEYCHANGE:
     640                        case TRACE_RT_LOSTCONN:
     641                        case TRACE_RT_CLIENTDROP:
     642                        case TRACE_RT_SERVERSTART:
     643                                break;
     644                        case TRACE_RT_PAUSE_ACK:
     645                        case TRACE_RT_OPTION:
     646                                break;
     647                        default:
     648                                fprintf(stderr, "Bad RT type for client: %d\n",
     649                                                packet->type);
     650                                return -1;
     651                }
     652        }
     653
     654        return ntohs(rthdr->length);
     655
    556656}
    557657
     
    583683}       
    584684
    585 /* Reads the body of an RT packet from the network */
    586 static int rt_read_data_packet(libtrace_t *libtrace,
    587                 libtrace_packet_t *packet, int blocking) {
    588         uint32_t prep_flags = 0;
    589 
    590         prep_flags |= TRACE_PREP_OWN_BUFFER;
    591 
    592         /* The stored RT header will tell us how much data we need to read */
    593         if (rt_read(libtrace, packet->buffer, (size_t)RT_INFO->rt_hdr.length,
    594                                 blocking) != RT_INFO->rt_hdr.length) {
    595                 return -1;
    596         }
    597 
    598         /* Send an ACK if required */
    599         if (RT_INFO->reliable > 0 && packet->type >= TRACE_RT_DATA_SIMPLE) {
    600                 RT_INFO->unacked ++;
    601                 if (RT_INFO->unacked >= RT_ACK_FREQUENCY) {
    602                         if (rt_send_ack(libtrace, RT_INFO->rt_hdr.sequence)
    603                                         == -1)
    604                                 return -1;
    605                         RT_INFO->unacked = 0;
    606                 }
    607         }
    608        
    609         /* Convert to the original capture format */
    610         if (rt_set_format(libtrace, packet) < 0) {
    611                 return -1;
    612         }
    613                
    614         /* Update payload pointers and packet type to match the original
    615          * format */
    616         if (trace_prepare_packet(packet->trace, packet, packet->buffer,
    617                                 packet->type, prep_flags)) {
    618                 return -1;
    619         }
    620 
    621         return 0;
    622 }
    623 
    624 /* Reads an RT packet from the network. Will block if the "blocking" flag is
    625  * set to 1, otherwise will return if insufficient data is available */
    626 static int rt_read_packet_versatile(libtrace_t *libtrace,
    627                 libtrace_packet_t *packet,int blocking) {
    628         rt_header_t hdr;
    629         rt_header_t *pkt_hdr = NULL;
    630         libtrace_rt_types_t switch_type;
    631        
    632         /* RT_LAST indicates that we need to read the RT header for the next
    633          * packet. This is a touch hax, I admit */
    634         if (RT_INFO->rt_hdr.type == TRACE_RT_LAST) {
    635                 /* FIXME: Better error handling required */
    636                 if (rt_read(libtrace, (void *)&hdr,
    637                                 sizeof(rt_header_t),blocking) !=
    638                                 sizeof(rt_header_t)) {
    639                         return -1;
    640                 }
    641                 /* Need to store these in case the next rt_read overwrites
    642                  * the buffer they came from! */
    643                 RT_INFO->rt_hdr.type = ntohl(hdr.type);
    644                 RT_INFO->rt_hdr.length = ntohs(hdr.length);
    645                 RT_INFO->rt_hdr.sequence = ntohl(hdr.sequence);
    646         }
    647 
    648         if (packet->buf_control == TRACE_CTRL_PACKET) {
    649                 if (packet->buffer == NULL) {
    650                         packet->buffer = malloc(RT_INFO->rt_hdr.length + sizeof(rt_header_t));
    651 
    652                 } else if (RT_INFO->rt_hdr.length > sizeof(packet->buffer)) {
    653                         packet->buffer = realloc(packet->buffer, RT_INFO->rt_hdr.length + sizeof(rt_header_t));
    654                 }
    655         }
    656        
    657         packet->type = RT_INFO->rt_hdr.type;
    658         packet->payload = packet->buffer;
    659        
    660        
    661         /* All data-bearing packets (as opposed to RT internal messages)
    662          * should be treated the same way when it comes to reading the rest
    663          * of the packet */
    664         if (packet->type >= TRACE_RT_DATA_SIMPLE) {
    665                 switch_type = TRACE_RT_DATA_SIMPLE;
    666         } else {
    667                 switch_type = packet->type;
    668         }
    669 
    670         switch(switch_type) {
    671                 case TRACE_RT_DATA_SIMPLE:
    672                 case TRACE_RT_DUCK_2_4:
    673                 case TRACE_RT_DUCK_2_5:
    674                 case TRACE_RT_STATUS:
    675                 case TRACE_RT_METADATA:
    676                         if (rt_read_data_packet(libtrace, packet, blocking))
    677                                 return -1;
    678                         break;
    679                 case TRACE_RT_END_DATA:
    680                 case TRACE_RT_KEYCHANGE:
    681                 case TRACE_RT_LOSTCONN:
    682                 case TRACE_RT_CLIENTDROP:
    683                 case TRACE_RT_SERVERSTART:
    684                         /* All these have no payload */
    685                         packet->header = packet->buffer;
    686                         packet->payload = ((char *)packet->buffer + sizeof(rt_header_t));
    687                         pkt_hdr = (rt_header_t *)packet->header;
    688                         pkt_hdr->type = ntohl(RT_INFO->rt_hdr.type);
    689                         pkt_hdr->length = ntohs(RT_INFO->rt_hdr.length);
    690                         pkt_hdr->sequence = ntohl(RT_INFO->rt_hdr.sequence);
    691 
    692                         /* XXX Do we need to save the other crap? */
    693                         break;
    694                 case TRACE_RT_PAUSE_ACK:
    695                         /* XXX: Add support for this */
    696                         break;
    697                 case TRACE_RT_OPTION:
    698                         /* XXX: Add support for this */
    699                         break;
    700                 default:
    701                         printf("Bad rt type for client receipt: %d\n",
    702                                         switch_type);
    703                         return -1;
    704         }
    705        
    706         /* Return the number of bytes read from the stream */
    707         RT_INFO->rt_hdr.type = TRACE_RT_LAST;
    708         return RT_INFO->rt_hdr.length + sizeof(rt_header_t);
    709 }
    710 
    711685/* Reads the next available packet in a blocking fashion */
    712686static int rt_read_packet(libtrace_t *libtrace,
    713687                libtrace_packet_t *packet) {
    714         return rt_read_packet_versatile(libtrace,packet,1);
     688        return rt_get_next_packet(libtrace,packet,1);
    715689}
    716690
     
    801775        do {
    802776
    803                 event.size = rt_read_packet_versatile(trace, packet, 0);
     777                event.size = rt_get_next_packet(trace, packet, 0);
    804778                if (event.size == -1) {
    805779                        read_err = trace_get_err(trace);
     
    877851        rt_read_packet,                 /* read_packet */
    878852        rt_prepare_packet,              /* prepare_packet */
    879         NULL,                           /* fin_packet */
     853        NULL,                           /* fin_packet */
    880854        NULL,                           /* write_packet */
    881855        rt_get_link_type,               /* get_link_type */
  • lib/libtrace.h.in

    rb53d019 rd391ce0  
    548548        uint64_t hash; /**< A hash of the packet as supplied by the user */
    549549        int error; /**< The error status of pread_packet */
     550        uint64_t internalid;            /** Internal indentifier for the pkt */
     551        void *srcbucket;
    550552} libtrace_packet_t;
    551553
  • lib/libtrace_int.h

    r1101175 rd391ce0  
    155155#include "data-struct/linked_list.h"
    156156#include "data-struct/sliding_window.h"
     157#include "data-struct/buckets.h"
    157158#include "pthread_spinlock.h"
    158159
  • lib/trace.c

    r6fac5db rd391ce0  
    788788DLLEXPORT libtrace_packet_t *trace_copy_packet(const libtrace_packet_t *packet) {
    789789        libtrace_packet_t *dest =
    790                 (libtrace_packet_t *)malloc(sizeof(libtrace_packet_t));
     790                (libtrace_packet_t *)calloc((size_t)1, sizeof(libtrace_packet_t));
    791791        if (!dest) {
    792792                printf("Out of memory constructing packet\n");
     
    849849                        packet->trace->format->fin_packet(packet);
    850850                }
     851
     852                if (packet->srcbucket && packet->internalid != 0) {
     853                        libtrace_bucket_t *b = (libtrace_bucket_t *)packet->srcbucket;
     854                        libtrace_release_bucket_id(b, packet->internalid);
     855                }
     856
     857                pthread_mutex_lock(&packet->trace->libtrace_lock);
    851858                if (packet->trace && packet->trace->last_packet == packet)
    852859                        packet->trace->last_packet = NULL;
     860                pthread_mutex_unlock(&packet->trace->libtrace_lock);
    853861
    854862                // No matter what we remove the header and link pointers
     
    972980       
    973981        packet->trace = trace;
     982        pthread_mutex_lock(&trace->libtrace_lock);
    974983        trace->last_packet = packet;
     984        pthread_mutex_unlock(&trace->libtrace_lock);
    975985       
    976986        /* Clear packet cache */
Note: See TracChangeset for help on using the changeset viewer.