Changeset 6cf3ca0


Ignore:
Timestamp:
02/12/15 19:04:26 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
0b01fea
Parents:
771ab22
Message:

Gets the ring format back to a working state, the bulk of the refactoring
is now done.

I've opted to remove the inheritance way of grabbing shared functions
and replaced it with a file containing the common functions. Hopefully
this is more obvious that both int and ring depend on these.

I've also reworked the formats to be stream orientated, which removed
duplicates of heaps of functions. And allows the parallel and single
thread code to be almost identical.

After doing this many of the places where we had differences in
functions between ring and int disappeared.

I've also upped the MAX_ORDER to 11, used in allocating memory
from the kernel for the ring format.
Since this seems to work on the testing machines.
And we'll continue to fallback to smaller values if needed anyway.

Location:
lib
Files:
2 added
2 deleted
3 edited
1 moved

Legend:

Unmodified
Added
Removed
  • lib/Makefile.am

    r771ab22 r6cf3ca0  
    66
    77extra_DIST = format_template.c
    8 NATIVEFORMATS=format_linux_ring.c format_linux.c
     8NATIVEFORMATS=format_linux_common.c format_linux_ring.c format_linux_int.c format_linux_common.h
    99BPFFORMATS=format_bpf.c
    1010
  • lib/format_linux_common.h

    r771ab22 r6cf3ca0  
    66 */
    77
     8#ifndef FORMAT_LINUX_COMMON_H
     9#define FORMAT_LINUX_COMMON_H
     10
    811#include "libtrace.h"
    912#include "libtrace_int.h"
     
    2326#include <fcntl.h>
    2427
    25 /* MAX_ORDER is defined in linux/mmzone.h. 10 is default for 2.4 kernel.
     28/* MAX_ORDER is defined in linux/mmzone.h. 11 is default for 3.0 kernels.
    2629 * max_order will be decreased by one if the ring buffer fails to allocate.
    27  * Used to get correct sized buffers from the kernel.
    28  */
    29 /* TODO: This is set to 11 in atleast the 3.x kernels. We should investigate
    30  * setting this higher to see if it improves performance. If not, then I guess
    31  * we can just leave it.
    32  */
    33 #define MAX_ORDER 10
    34 
    35 /* Cached page size, the page size shouldn't be changing */
    36 static int pagesize = 0;
    37 
     30 * Used to get the correct sized buffers from the kernel.
     31 */
     32#define MAX_ORDER 11
    3833/* Number of frames in the ring used by both TX and TR rings. More frames
    3934 * hopefully means less packet loss, especially if traffic comes in bursts.
     
    195190        /* Flag indicating whether the statistics are current or not */
    196191        int stats_valid;
    197         /* The actual format being used - ring vs int */
    198         libtrace_rt_types_t format;
    199192        /* The current ring buffer layout */
    200193        struct tpacket_req req;
     
    206199         * so we use a random here to try avoid collisions */
    207200        uint16_t fanout_group;
    208         /* Parent format so we can call parent functions */
    209         struct libtrace_format_t *parent_format;
    210201        /* When running in parallel mode this is malloc'd with an array
    211202         * file descriptors from packet fanout will use, here we assume/hope
     
    231222        /* Used to determine buffer size for the ring buffer */
    232223        uint32_t max_order;
    233         /* Parent format so we can call parent functions */
    234         struct libtrace_format_t *parent_format;
    235224};
    236225
     
    244233} ALIGN_STRUCT(CACHE_LINE_SIZE);
    245234
    246 #define ZERO_LINUX_STREAM {-1, NULL, 0}
     235#define ZERO_LINUX_STREAM {-1, MAP_FAILED, 0}
    247236
    248237
     
    277266#define FORMAT_DATA_OUT DATA_OUT(libtrace)
    278267
    279 #define PARENT FORMAT_DATA->parent_format
    280 #define PARENT_OUT FORMAT_DATA_OUT->parent_format
    281 
    282268#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
    283269#define FORMAT_DATA_FIRST ((struct linux_per_stream_t *)FORMAT_DATA_HEAD->data)
     
    287273        + TPACKET_ALIGN(sizeof(struct tpacket2_hdr))))
    288274
    289 /* TODO: Decide if inheritance is how we want to do this. Basically, ring is
    290  * a subclass of native and so it makes sense to reuse the native code where
    291  * possible. Moving ring into a new file really helps with readability, and
    292  * also helps us not carry so much ring data around in the native format. */
    293 struct libtrace_format_t *get_native_format(void);
    294 
    295 static inline libtrace_linktype_t get_libtrace_link_type(uint16_t linktype)
    296 {
    297         /* Convert the ARPHRD type into an appropriate libtrace link type */
    298         switch (linktype) {
    299                 case LIBTRACE_ARPHRD_ETHER:
    300                 case LIBTRACE_ARPHRD_LOOPBACK:
    301                         return TRACE_TYPE_ETH;
    302                 case LIBTRACE_ARPHRD_PPP:
    303                         return TRACE_TYPE_NONE;
    304                 case LIBTRACE_ARPHRD_IEEE80211_RADIOTAP:
    305                         return TRACE_TYPE_80211_RADIO;
    306                 case LIBTRACE_ARPHRD_IEEE80211:
    307                         return TRACE_TYPE_80211;
    308                 case LIBTRACE_ARPHRD_SIT:
    309                 case LIBTRACE_ARPHRD_NONE:
    310                         return TRACE_TYPE_NONE;
    311                 default: /* shrug, beyond me! */
    312                         printf("unknown Linux ARPHRD type 0x%04x\n",linktype);
    313                         return (libtrace_linktype_t)~0U;
    314         }
    315 }
    316 
    317 static inline libtrace_direction_t get_libtrace_direction(uint8_t pkttype)
    318 {
    319         switch (pkttype) {
    320                 case PACKET_OUTGOING:
    321                 case PACKET_LOOPBACK:
    322                         return TRACE_DIR_OUTGOING;
    323                 case PACKET_OTHERHOST:
    324                         return TRACE_DIR_OTHER;
    325                 default:
    326                         return TRACE_DIR_INCOMING;
    327         }
    328 }
    329 
    330 static libtrace_direction_t set_direction(struct sockaddr_ll * skadr,
    331                                           libtrace_direction_t direction)
    332 {
    333         switch (direction) {
    334                 case TRACE_DIR_OUTGOING:
    335                         skadr->sll_pkttype = PACKET_OUTGOING;
    336                         return TRACE_DIR_OUTGOING;
    337                 case TRACE_DIR_INCOMING:
    338                         skadr->sll_pkttype = PACKET_HOST;
    339                         return TRACE_DIR_INCOMING;
    340                 case TRACE_DIR_OTHER:
    341                         skadr->sll_pkttype = PACKET_OTHERHOST;
    342                         return TRACE_DIR_OTHER;
    343                 default:
    344                         return -1;
    345         }
    346 }
     275/* Common functions */
     276#ifdef HAVE_NETPACKET_PACKET_H
     277int linuxcommon_init_input(libtrace_t *libtrace);
     278int linuxcommon_init_output(libtrace_out_t *libtrace);
     279int linuxcommon_probe_filename(const char *filename);
     280int linuxcommon_config_input(libtrace_t *libtrace, trace_option_t option,
     281                             void *data);
     282void linuxcommon_close_input_stream(libtrace_t *libtrace,
     283                                    struct linux_per_stream_t *stream);
     284int linuxcommon_start_input_stream(libtrace_t *libtrace,
     285                                   struct linux_per_stream_t *stream);
     286inline int linuxcommon_to_packet_fanout(libtrace_t *libtrace,
     287                                        struct linux_per_stream_t *stream);
     288int linuxcommon_pause_input(libtrace_t *libtrace);
     289int linuxcommon_get_fd(const libtrace_t *libtrace);
     290int linuxcommon_fin_input(libtrace_t *libtrace);
     291int linuxcommon_pconfig_input(libtrace_t *libtrace,
     292                              trace_parallel_option_t option,
     293                              void *data);
     294int linuxcommon_pregister_thread(libtrace_t *libtrace,
     295                                 libtrace_thread_t *t,
     296                                 bool reading);
     297int linuxcommon_pstart_input(libtrace_t *libtrace,
     298                             int (*start_stream)(libtrace_t *, struct linux_per_stream_t*));
     299#endif /* HAVE_NETPACKET_PACKET_H */
     300
     301uint64_t linuxcommon_get_captured_packets(libtrace_t *libtrace);
     302uint64_t linuxcommon_get_filtered_packets(libtrace_t *libtrace);
     303uint64_t linuxcommon_get_dropped_packets(libtrace_t *libtrace);
     304inline libtrace_direction_t linuxcommon_get_direction(uint8_t pkttype);
     305inline libtrace_direction_t linuxcommon_set_direction(struct sockaddr_ll * skadr,
     306                                                 libtrace_direction_t direction);
     307inline libtrace_linktype_t linuxcommon_get_link_type(uint16_t linktype);
     308
     309
     310#endif /* FORMAT_LINUX_COMMON_H */
  • lib/format_linux_ring.c

    r1871afc r6cf3ca0  
    6161#endif
    6262
    63 #include "format_linux.h"
     63#include "format_linux_common.h"
    6464
    6565#ifdef HAVE_NETPACKET_PACKET_H
    6666/* Get current frame in the ring buffer*/
    67 #define GET_CURRENT_BUFFER(libtrace) \
    68         ((void *)FORMAT_DATA_FIRST->rx_ring +                           \
    69          (FORMAT_DATA_FIRST->rxring_offset *                            \
     67#define GET_CURRENT_BUFFER(libtrace, stream) \
     68        ((void *)stream->rx_ring +                              \
     69         (stream->rxring_offset *                               \
    7070          FORMAT_DATA->req.tp_frame_size))
    7171#endif
     
    7676#define TP_TRACE_START(mac, net, hdrend) \
    7777        ((mac) > (hdrend) && (mac) < (net) ? (mac) : (net))
     78/* Cached page size, the page size shouldn't be changing */
     79static int pagesize = 0;
    7880
    7981/*
     
    151153}
    152154
    153 static inline int socket_to_packetmmap( char * uridata, int ring_type,
     155static inline int socket_to_packetmmap(char * uridata, int ring_type,
    154156                                        int fd,
    155157                                        struct tpacket_req * req,
     
    176178         */
    177179        while(1) {
     180                fprintf(stderr, "max_order=%d\n", *max_order);
    178181                if (*max_order <= 0) {
    179182                        strncpy(error,
     
    199202                } else break;
    200203        }
     204        fprintf(stderr, "max_order=%d\n", *max_order);
    201205
    202206        /* Map the ring buffer into userspace */
     
    214218/* Release a frame back to the kernel or free() if it's a malloc'd buffer
    215219 */
    216 inline static void ring_release_frame(libtrace_t *libtrace,
     220inline static void ring_release_frame(libtrace_t *libtrace UNUSED,
    217221                                      libtrace_packet_t *packet)
    218222{
     
    227231
    228232        if(packet->buf_control == TRACE_CTRL_EXTERNAL) {
    229                 struct linux_format_data_t *ftd = FORMAT_DATA;
    230 
     233                //struct linux_format_data_t *ftd = FORMAT_DATA;
    231234                /* Check it's within our buffer first - consider the pause
    232235                 * resume case it might have already been free'd lets hope we
     
    244247}
    245248
    246 static int linuxring_init_input(libtrace_t *libtrace)
    247 {
    248         /* Used to bootstrap the format data. We also save it later as it's very
    249          * useful to us. */
    250         struct libtrace_format_t *parent = get_native_format();
    251 
    252         /* This function will allocate the format data memory as well as
    253          * initialising all the native relevant fields. Anything native doesn't
    254          * touch will be touched here. */
    255         parent->init_input(libtrace);
    256 
    257         FORMAT_DATA->format = TRACE_RT_DATA_LINUX_RING;
    258         FORMAT_DATA->parent_format = parent;
    259 
    260         FORMAT_DATA_FIRST->rx_ring = NULL;
    261         FORMAT_DATA_FIRST->rxring_offset = 0;
    262         FORMAT_DATA->max_order = MAX_ORDER;
    263 
    264         /* If we haven't already, we need to update all the fields in our
    265          * format to point to the relevant fields in the parent format */
    266         if (libtrace->format->get_filtered_packets == NULL) {
    267 #ifdef HAVE_NETPACKET_PACKET_H
    268                 libtrace->format->probe_filename = parent->probe_filename;
    269                 libtrace->format->config_input = parent->config_input;
    270                 libtrace->format->fin_input = parent->fin_input;
    271 
    272                 libtrace->format->pstart_input = parent->pstart_input;
    273                 libtrace->format->ppause_input = parent->ppause_input;
    274                 libtrace->format->pfin_input = parent->fin_input;
    275                 libtrace->format->pconfig_input = parent->pconfig_input;
    276                 libtrace->format->pregister_thread = parent->pregister_thread;
    277 #endif
    278                 libtrace->format->get_filtered_packets =
    279                         parent->get_filtered_packets;
    280                 libtrace->format->get_dropped_packets =
    281                         parent->get_dropped_packets;
    282                 libtrace->format->get_captured_packets =
    283                         parent->get_captured_packets;
    284                 libtrace->format->get_fd = parent->get_fd;
    285         }
    286 
    287         return 0;
    288 }
    289 
    290 static int linuxring_start_input(libtrace_t *libtrace)
    291 {
     249static inline int linuxring_start_input_stream(libtrace_t *libtrace,
     250                                               struct linux_per_stream_t *stream) {
    292251        char error[2048];
    293252
    294253        /* We set the socket up the same and then convert it to PACKET_MMAP */
    295         if(PARENT->start_input(libtrace) != 0)
     254        if (linuxcommon_start_input_stream(libtrace, stream) < 0)
    296255                return -1;
    297256
     
    300259        /* Make it a packetmmap */
    301260        if(socket_to_packetmmap(libtrace->uridata, PACKET_RX_RING,
    302                                 FORMAT_DATA_FIRST->fd,
    303                                 &FORMAT_DATA->req,
    304                                 &FORMAT_DATA_FIRST->rx_ring,
    305                                 &FORMAT_DATA->max_order,
    306                                 error) != 0) {
     261                                stream->fd,
     262                                &FORMAT_DATA->req,
     263                                &stream->rx_ring,
     264                                &FORMAT_DATA->max_order,
     265                                error) != 0) {
    307266                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
    308                               "Initialisation of packet MMAP failed: %s",
    309                               error);
    310                 close(FORMAT_DATA_FIRST->fd);
     267                              "Initialisation of packet MMAP failed: %s",
     268                              error);
     269                linuxcommon_close_input_stream(libtrace, stream);
     270                return -1;
     271        }
     272
     273        return 0;
     274}
     275
     276static int linuxring_start_input(libtrace_t *libtrace)
     277{
     278        int ret = linuxring_start_input_stream(libtrace, FORMAT_DATA_FIRST);
     279        if (ret != 0) {
     280                libtrace_list_deinit(FORMAT_DATA->per_stream);
    311281                free(libtrace->format_data);
    312282                libtrace->format_data = NULL;
     283        }
     284        return ret;
     285}
     286
     287static int linuxring_pstart_input(libtrace_t *libtrace) {
     288        return linuxcommon_pstart_input(libtrace, linuxring_start_input_stream);
     289}
     290
     291static int linuxring_start_output(libtrace_out_t *libtrace)
     292{
     293        char error[2048];
     294        FORMAT_DATA_OUT->fd = socket(PF_PACKET, SOCK_RAW, 0);
     295        if (FORMAT_DATA_OUT->fd==-1) {
     296                free(FORMAT_DATA_OUT);
     297                trace_set_err_out(libtrace, errno, "Failed to create raw socket");
    313298                return -1;
    314299        }
    315 
    316         return 0;
    317 }
    318 
    319 static int linuxring_pause_input(libtrace_t *libtrace)
    320 {
    321         munmap(FORMAT_DATA_FIRST->rx_ring,
    322                FORMAT_DATA->req.tp_block_size *
    323                FORMAT_DATA->req.tp_block_nr);
    324         FORMAT_DATA_FIRST->rx_ring = NULL;
    325         return PARENT->pause_input(libtrace);
    326 }
    327 
    328 static int linuxring_init_output(libtrace_out_t *libtrace)
    329 {
    330         /* Used to bootstrap the format data. We also save it later as it's very
    331          * useful to us. */
    332         struct libtrace_format_t *parent = get_native_format();
    333 
    334         /* This function will allocate the format data memory as well as
    335          * initialising all the native relevant fields. Anything native doesn't
    336          * touch will be touched here. */
    337         parent->init_output(libtrace);
    338 
    339         FORMAT_DATA_OUT->format = TRACE_FORMAT_LINUX_RING;
    340         FORMAT_DATA_OUT->parent_format = parent;
    341 
    342         /* If we haven't already, we need to update all the fields in our
    343          * format to point to the relevant fields in the parent format */
    344         if (libtrace->format->get_filtered_packets == NULL) {
    345 #ifdef HAVE_NETPACKET_PACKET_H
    346                 libtrace->format->probe_filename = parent->probe_filename;
    347                 libtrace->format->config_input = parent->config_input;
    348                 libtrace->format->fin_input = parent->fin_input;
    349 
    350                 libtrace->format->pstart_input = parent->pstart_input;
    351                 libtrace->format->ppause_input = parent->ppause_input;
    352                 libtrace->format->pfin_input = parent->fin_input;
    353                 libtrace->format->pconfig_input = parent->pconfig_input;
    354                 libtrace->format->pregister_thread = parent->pregister_thread;
    355 #endif
    356                 libtrace->format->get_filtered_packets =
    357                         parent->get_filtered_packets;
    358                 libtrace->format->get_dropped_packets =
    359                         parent->get_dropped_packets;
    360                 libtrace->format->get_captured_packets =
    361                         parent->get_captured_packets;
    362                 libtrace->format->get_fd = parent->get_fd;
    363         }
    364 
    365         return 0;
    366 }
    367 
    368 static int linuxring_start_output(libtrace_out_t *libtrace)
    369 {
    370         char error[2048];
    371         /* We set the socket up the same and then convert it to PACKET_MMAP */
    372         if (PARENT_OUT->start_output(libtrace) != 0)
    373                 return -1;
    374300
    375301        /* Make it a packetmmap */
     
    416342               FORMAT_DATA_OUT->req.tp_block_nr);
    417343
    418         return PARENT_OUT->fin_output(libtrace);
    419 }
    420 
    421 static int linuxring_read_packet(libtrace_t *libtrace,
    422                                  libtrace_packet_t *packet)
    423 {
    424         /* Read a single packet from the ring at FORMAT_DATA_FIRST */
    425         return -1;
    426 }
    427 
    428 static int linuxring_pread_packets(libtrace_t *libtrace,
    429                                    libtrace_thread_t *t,
    430                                    libtrace_packet_t **packets,
    431                                    size_t nb_packets)
    432 {
    433         struct linux_per_stream_t *stream_data =
    434                 (struct linux_per_stream_t *)t->format_data;
    435         return -1;
    436 }
    437 
    438 #if 0
    439 /* This is the way we used to do it. Might be useful to copy some of this
    440  * into the new code */
    441 inline static int linuxring_read_packet_fd(libtrace_t *libtrace, libtrace_packet_t *packet, int fd, int *rxring_offset, char *rx_ring, int message) {
     344        /* Free the socket */
     345        close(FORMAT_DATA_OUT->fd);
     346        FORMAT_DATA_OUT->fd=-1;
     347        free(libtrace->format_data);
     348        return 0;
     349}
     350
     351static libtrace_linktype_t
     352linuxring_get_link_type(const struct libtrace_packet_t *packet)
     353{
     354        uint16_t linktype = GET_SOCKADDR_HDR(packet->buffer)->sll_hatype;
     355        return linuxcommon_get_link_type(linktype);
     356}
     357
     358static libtrace_direction_t
     359linuxring_get_direction(const struct libtrace_packet_t *packet) {
     360        return linuxcommon_get_direction(GET_SOCKADDR_HDR(packet->buffer)->
     361                                         sll_pkttype);
     362}
     363
     364static libtrace_direction_t
     365linuxring_set_direction(libtrace_packet_t *packet,
     366                        libtrace_direction_t direction) {
     367        return linuxcommon_set_direction(GET_SOCKADDR_HDR(packet->buffer), direction);
     368}
     369
     370static struct timeval linuxring_get_timeval(const libtrace_packet_t *packet)
     371{
     372        struct timeval tv;
     373        tv.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
     374        tv.tv_usec = TO_TP_HDR2(packet->buffer)->tp_nsec / 1000;
     375        return tv;
     376}
     377
     378static struct timespec linuxring_get_timespec(const libtrace_packet_t *packet)
     379{
     380        struct timespec ts;
     381        ts.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
     382        ts.tv_nsec = TO_TP_HDR2(packet->buffer)->tp_nsec;
     383        return ts;
     384}
     385
     386static int linuxring_get_capture_length(const libtrace_packet_t *packet)
     387{
     388        return TO_TP_HDR2(packet->buffer)->tp_snaplen;
     389}
     390
     391static int linuxring_get_wire_length(const libtrace_packet_t *packet)
     392{
     393        int wirelen = TO_TP_HDR2(packet->buffer)->tp_len;
     394
     395        /* Include the missing FCS */
     396        if (trace_get_link_type(packet) == TRACE_TYPE_ETH)
     397                wirelen += 4;
     398
     399        return wirelen;
     400}
     401
     402static int linuxring_get_framing_length(const libtrace_packet_t *packet)
     403{
     404        /*
     405         * Need to make frame_length + capture_length = complete capture length
     406         * so include alignment whitespace. So reverse calculate from packet.
     407         */
     408        return (char *)packet->payload - (char *)packet->buffer;
     409}
     410
     411static size_t linuxring_set_capture_length(libtrace_packet_t *packet,
     412                                           size_t size)
     413{
     414        assert(packet);
     415        if (size > trace_get_capture_length(packet)) {
     416                /* We should avoid making a packet larger */
     417                return trace_get_capture_length(packet);
     418        }
     419
     420        /* Reset the cached capture length */
     421        packet->capture_length = -1;
     422
     423        TO_TP_HDR2(packet->buffer)->tp_snaplen = size;
     424
     425        return trace_get_capture_length(packet);
     426}
     427
     428static int linuxring_prepare_packet(libtrace_t *libtrace UNUSED,
     429                                    libtrace_packet_t *packet, void *buffer,
     430                                    libtrace_rt_types_t rt_type, uint32_t flags)
     431{
     432        if (packet->buffer != buffer &&
     433            packet->buf_control == TRACE_CTRL_PACKET) {
     434                free(packet->buffer);
     435        }
     436
     437        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
     438                packet->buf_control = TRACE_CTRL_PACKET;
     439        else
     440                packet->buf_control = TRACE_CTRL_EXTERNAL;
     441
     442
     443        packet->buffer = buffer;
     444        packet->header = buffer;
     445        packet->payload = (char *)buffer +
     446                TP_TRACE_START(TO_TP_HDR2(packet->header)->tp_mac,
     447                               TO_TP_HDR2(packet->header)->tp_net,
     448                               TPACKET2_HDRLEN);
     449        packet->type = rt_type;
     450
     451        return 0;
     452}
     453#define LIBTRACE_MIN(a,b) ((a)<(b) ? (a) : (b))
     454inline static int linuxring_read_stream(libtrace_t *libtrace,
     455                                        libtrace_packet_t *packet,
     456                                        struct linux_per_stream_t *stream,
     457                                        libtrace_message_queue_t *queue) {
    442458
    443459        struct tpacket2_hdr *header;
    444460        int ret;
    445461        unsigned int snaplen;
    446        
     462        struct pollfd pollset[2];
     463
    447464        ring_release_frame(libtrace, packet);
    448465       
     
    451468       
    452469        /* Fetch the current frame */
    453         header = ((void*) rx_ring) + *rxring_offset * FORMAT(libtrace->format_data)->req.tp_frame_size; // GET_CURRENT_BUFFER(libtrace);
     470        header = GET_CURRENT_BUFFER(libtrace, stream);
    454471        assert((((unsigned long) header) & (pagesize - 1)) == 0);
    455472
     
    459476         */
    460477        while (!(header->tp_status & TP_STATUS_USER)) {
    461                 if (message) {
    462                         struct pollfd pollset[2];
    463                         pollset[0].fd = fd;
    464                         pollset[0].events = POLLIN;
    465                         pollset[0].revents = 0;
    466                         pollset[1].fd = libtrace_message_queue_get_fd(&get_thread_table(libtrace)->messages);
     478                pollset[0].fd = stream->fd;
     479                pollset[0].events = POLLIN;
     480                pollset[0].revents = 0;
     481                if (queue) {
     482                        pollset[1].fd = libtrace_message_queue_get_fd(queue);
    467483                        pollset[1].events = POLLIN;
    468484                        pollset[1].revents = 0;
    469                         /* Wait for more data or a message*/
    470                         ret = poll(pollset, 2, -1);
    471                         if (ret < 0) {
    472                                 if (errno != EINTR)
    473                                         trace_set_err(libtrace,errno,"poll()");
    474                                 return -1;
    475                         }
    476                         /* A message is ready */
    477                         if (pollset[1].revents)
    478                                 return -2;
     485                }
     486                /* Wait for more data or a message*/
     487                ret = poll(pollset, (queue ? 2 : 1), -1);
     488                if (ret > 0) {
     489                        if (pollset[0].revents)
     490                                continue;
     491                        else
     492                                return READ_MESSAGE;
     493                } else if (ret < 0) {
     494                        if (errno != EINTR)
     495                        trace_set_err(libtrace,errno,"poll()");
     496                        return -1;
    479497                } else {
    480                         struct pollfd pollset;
    481                         pollset.fd = fd;
    482                         pollset.events = POLLIN;
    483                         pollset.revents = 0;
    484 
    485                         /* Wait for more data or a message*/
    486                         ret = poll(&pollset, 1, 500);
    487                         if (ret < 0) {
    488                                 if (errno != EINTR)
    489                                         trace_set_err(libtrace,errno,"poll()");
    490                                 return -1;
    491                         } else if (ret == 0) {
    492                                 /* Poll timed out - check if we should exit */
    493                                 if (libtrace_halt)
    494                                         return 0;
    495                                 continue;
    496                         }
     498                        /* Poll timed out - check if we should exit */
     499                        if (libtrace_halt)
     500                                return 0;
     501                        continue;
    497502                }
    498503        }
     
    505510        snaplen=LIBTRACE_MIN(
    506511                        (int)LIBTRACE_PACKET_BUFSIZE-(int)sizeof(*header),
    507                         (int)FORMAT(libtrace->format_data)->snaplen);
     512                        (int)FORMAT_DATA->snaplen);
    508513       
    509514        TO_TP_HDR2(packet->buffer)->tp_snaplen = LIBTRACE_MIN((unsigned int)snaplen, TO_TP_HDR2(packet->buffer)->tp_len);
    510515
    511516        /* Move to next buffer */
    512         (*rxring_offset)++;
    513         *rxring_offset %= FORMAT(libtrace->format_data)->req.tp_frame_nr;
     517        stream->rxring_offset++;
     518        stream->rxring_offset %= FORMAT_DATA->req.tp_frame_nr;
    514519
    515520        /* We just need to get prepare_packet to set all our packet pointers
     
    524529
    525530static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    526         int fd = FORMAT(libtrace->format_data)->fd;
    527         int *rxring_offset = &FORMAT(libtrace->format_data)->rxring_offset;
    528         char *rx_ring = FORMAT(libtrace->format_data)->rx_ring;
    529         return linuxring_read_packet_fd(libtrace, packet, fd, rxring_offset, rx_ring, 0);
     531        return linuxring_read_stream(libtrace, packet, FORMAT_DATA_FIRST, NULL);
    530532}
    531533
    532534static int linuxring_pread_packets(libtrace_t *libtrace,
    533535                                   libtrace_thread_t *t,
    534                                    libtrace_packet_t **packets,
     536                                   libtrace_packet_t *packets[],
    535537                                   UNUSED size_t nb_packets) {
    536         //fprintf(stderr, "Thread number is #%d\n", t->perpkt_num);
    537         int fd = PERPKT_FORMAT(t)->fd;
    538         int *rxring_offset = &PERPKT_FORMAT(t)->rxring_offset;
    539         char *rx_ring = PERPKT_FORMAT(t)->rx_ring;
    540         packets[0]->error = linuxring_read_packet_fd(libtrace, packets[0], fd,
    541                                                      rxring_offset, rx_ring, 1);
     538        /* For now just read one packet */
     539        packets[0]->error = linuxring_read_stream(libtrace, packets[0],
     540                                                  t->format_data, &t->messages);
    542541        if (packets[0]->error >= 1)
    543542                return 1;
     
    545544                return packets[0]->error;
    546545}
    547 #endif /* 0 */
    548546
    549547/* Non-blocking read */
     
    559557
    560558        /* Fetch the current frame */
    561         header = GET_CURRENT_BUFFER(libtrace);
     559        header = GET_CURRENT_BUFFER(libtrace, FORMAT_DATA_FIRST);
    562560        if (header->tp_status & TP_STATUS_USER) {
    563561                /* We have a frame waiting */
     
    573571}
    574572
    575 static int linuxring_prepare_packet(libtrace_t *libtrace UNUSED,
    576                                     libtrace_packet_t *packet, void *buffer,
    577                                     libtrace_rt_types_t rt_type, uint32_t flags)
    578 {
    579         if (packet->buffer != buffer &&
    580             packet->buf_control == TRACE_CTRL_PACKET) {
    581                 free(packet->buffer);
    582         }
    583 
    584         if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
    585                 packet->buf_control = TRACE_CTRL_PACKET;
    586         else
    587                 packet->buf_control = TRACE_CTRL_EXTERNAL;
    588 
    589 
    590         packet->buffer = buffer;
    591         packet->header = buffer;
    592         packet->payload = (char *)buffer +
    593                 TP_TRACE_START(TO_TP_HDR2(packet->header)->tp_mac,
    594                                TO_TP_HDR2(packet->header)->tp_net,
    595                                TPACKET2_HDRLEN);
    596         packet->type = rt_type;
    597 
    598         return 0;
    599 }
    600 
    601573/**
    602574 * Free any resources being kept for this packet, Note: libtrace
     
    613585        /* If we own the packet (i.e. it's not a copy), we need to free it */
    614586        if (packet->buf_control == TRACE_CTRL_EXTERNAL) {
    615                 /* Started should always match the existence of the rx_ring */
     587                /* Started should always match the existence of the rx_ring
     588                 * in the parallel case still just check the first ring */
    616589                assert(!!FORMAT_DATA_FIRST->rx_ring ==
    617590                       !!packet->trace->started);
    618591                /* If we don't have a ring its already been destroyed */
    619                 if (FORMAT_DATA_FIRST->rx_ring)
     592                if (FORMAT_DATA_FIRST->rx_ring != MAP_FAILED)
    620593                        ring_release_frame(packet->trace, packet);
    621594                else
     
    706679}
    707680
    708 static libtrace_linktype_t
    709 linuxring_get_link_type(const struct libtrace_packet_t *packet)
    710 {
    711         uint16_t linktype = GET_SOCKADDR_HDR(packet->buffer)->sll_hatype;
    712         return get_libtrace_link_type(linktype);
    713 }
    714 
    715 static libtrace_direction_t
    716 linuxring_get_direction(const struct libtrace_packet_t *packet) {
    717         return get_libtrace_direction(GET_SOCKADDR_HDR(packet->buffer)->
    718                                       sll_pkttype);
    719 }
    720 
    721 static libtrace_direction_t
    722 linuxring_set_direction(libtrace_packet_t *packet,
    723                         libtrace_direction_t direction) {
    724         return set_direction(GET_SOCKADDR_HDR(packet->buffer), direction);
    725 }
    726 
    727 static struct timeval linuxring_get_timeval(const libtrace_packet_t *packet)
    728 {
    729         struct timeval tv;
    730         tv.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
    731         tv.tv_usec = TO_TP_HDR2(packet->buffer)->tp_nsec / 1000;
    732         return tv;
    733 }
    734 
    735 static struct timespec linuxring_get_timespec(const libtrace_packet_t *packet)
    736 {
    737         struct timespec ts;
    738         ts.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
    739         ts.tv_nsec = TO_TP_HDR2(packet->buffer)->tp_nsec;
    740         return ts;
    741 }
    742 
    743 static int linuxring_get_capture_length(const libtrace_packet_t *packet)
    744 {
    745         return TO_TP_HDR2(packet->buffer)->tp_snaplen;
    746 }
    747 
    748 static int linuxring_get_wire_length(const libtrace_packet_t *packet)
    749 {
    750         int wirelen = TO_TP_HDR2(packet->buffer)->tp_len;
    751 
    752         /* Include the missing FCS */
    753         if (trace_get_link_type(packet) == TRACE_TYPE_ETH)
    754                 wirelen += 4;
    755 
    756         return wirelen;
    757 }
    758 
    759 static int linuxring_get_framing_length(const libtrace_packet_t *packet)
    760 {
    761         /*
    762          * Need to make frame_length + capture_length = complete capture length
    763          * so include alligment whitespace. So reverse calculate from packet.
    764          */
    765         return (char *)packet->payload - (char *)packet->buffer;
    766 }
    767 
    768 static size_t linuxring_set_capture_length(libtrace_packet_t *packet,
    769                                            size_t size)
    770 {
    771         assert(packet);
    772         if (size > trace_get_capture_length(packet)) {
    773                 /* We should avoid making a packet larger */
    774                 return trace_get_capture_length(packet);
    775         }
    776 
    777         /* Reset the cached capture length */
    778         packet->capture_length = -1;
    779 
    780         TO_TP_HDR2(packet->buffer)->tp_snaplen = size;
    781 
    782         return trace_get_capture_length(packet);
    783 }
    784 
    785681#ifdef HAVE_NETPACKET_PACKET_H
    786682
     
    801697        "$Id$",
    802698        TRACE_FORMAT_LINUX_RING,
    803         NULL,                           /* probe filename */
     699        linuxcommon_probe_filename,     /* probe filename */
    804700        NULL,                           /* probe magic */
    805         linuxring_init_input,           /* init_input */
    806         NULL,                           /* config_input */
     701        linuxcommon_init_input,         /* init_input */
     702        linuxcommon_config_input,       /* config_input */
    807703        linuxring_start_input,          /* start_input */
    808         linuxring_pause_input,          /* pause_input */
    809         linuxring_init_output,          /* init_output */
     704        linuxcommon_pause_input,        /* pause_input */
     705        linuxcommon_init_output,        /* init_output */
    810706        NULL,                           /* config_output */
    811707        linuxring_start_output,         /* start_ouput */
    812         NULL,                           /* fin_input */
     708        linuxcommon_fin_input,          /* fin_input */
    813709        linuxring_fin_output,           /* fin_output */
    814710        linuxring_read_packet,          /* read_packet */
     
    831727        linuxring_set_capture_length,   /* set_capture_length */
    832728        NULL,                           /* get_received_packets */
    833         NULL,                           /* get_filtered_packets */
    834         NULL,                           /* get_dropped_packets */
    835         NULL,                           /* get_captured_packets */
    836         NULL,                           /* get_fd */
     729        linuxcommon_get_filtered_packets,/* get_filtered_packets */
     730        linuxcommon_get_dropped_packets,/* get_dropped_packets */
     731        linuxcommon_get_captured_packets,/* get_captured_packets */
     732        linuxcommon_get_fd,             /* get_fd */
    837733        linuxring_event,                /* trace_event */
    838         linuxring_help,                 /* help */
     734        linuxring_help,                 /* help */
    839735        NULL,                           /* next pointer */
    840736        {true, -1},                     /* Live, no thread limit */
    841         NULL,                           /* pstart_input */
     737        linuxring_pstart_input,         /* pstart_input */
    842738        linuxring_pread_packets,        /* pread_packets */
    843         NULL,                           /* ppause */
    844         NULL,                           /* p_fin */
    845         NULL,
    846         NULL,
     739        linuxcommon_pause_input,        /* ppause */
     740        linuxcommon_fin_input,          /* p_fin */
     741        linuxcommon_pconfig_input,      /* pconfig input */
     742        linuxcommon_pregister_thread,
    847743        NULL
    848744};
  • lib/libtrace_int.h

    rcb39d35 r6cf3ca0  
    11681168/** Constructor for the Linux Native format module */
    11691169void linuxnative_constructor(void);
     1170/** Constructor for the Linux Ring format module */
     1171void linuxring_constructor(void);
    11701172/** Constructor for the PCAP format module */
    11711173void pcap_constructor(void);
Note: See TracChangeset for help on using the changeset viewer.