source: lib/format_linux_ring.c @ 5e3f16c

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since 5e3f16c was 5e3f16c, checked in by Richard Sanger <rsanger@…>, 5 years ago

Fix for issue #39 - ring and int pstop() fails on older kernels when using threads

The problem here is that on old kernels without PACKET_FANOUT support
(added in v3.1) will only include the single threaded versions of int
and ring. When used with multiple threads the libtrace API will
fallback to using read rather than pread which does not check message
queues.

To fix this issue, in any format without pread support:

  • We check for new messages with each loop around read_packet as we fill the burst
  • Within read_packet we update the halt to include the pausing state
  • Use a seperate lock to the main lock when reading a burst of packets, otherwise trace_ppause has to wait for a burst to read.

This is not 100% perfect as a single packet might still need to be received
before a generic message can be received.
A proper fix in the future would be to move all format internals purely to the
parallel API.

  • Property mode set to 100644
File size: 23.6 KB
RevLine 
[1871afc]1/*
2 *
[ee6e802]3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
[1871afc]4 * All rights reserved.
5 *
[ee6e802]6 * This file is part of libtrace.
7 *
[1871afc]8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
[ee6e802]12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
[1871afc]14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
[ee6e802]19 * GNU Lesser General Public License for more details.
[1871afc]20 *
[ee6e802]21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
[1871afc]23 *
24 *
25 */
26
27/* This format module deals with using the Linux Ring capture format (also
28 * known as PACKET_MMAP).
29 *
30 * Linux Ring is a LIVE capture format.
31 *
32 * This format also supports writing which will write packets out to the
33 * network as a form of packet replay. This should not be confused with the
34 * RT protocol which is intended to transfer captured packet records between
35 * RT-speaking programs.
36 */
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include <stdlib.h>
44#include <errno.h>
45#include <unistd.h>
46#include <string.h>
47#include <assert.h>
48
49#ifdef HAVE_INTTYPES_H
50#  include <inttypes.h>
51#else
52# error "Can't find inttypes.h"
53#endif
54
[6cf3ca0]55#include "format_linux_common.h"
[1871afc]56
[9d89626]57/* Get the start of the captured data. I'm not sure if tp_mac (link layer) is
58 * always guaranteed. If it's not there then just use tp_net.
59 */
60#define TP_TRACE_START(mac, net, hdrend) \
61        ((mac) > (hdrend) && (mac) < (net) ? (mac) : (net))
62
[1871afc]63#ifdef HAVE_NETPACKET_PACKET_H
64/* Get current frame in the ring buffer*/
[e4f27d1]65#define GET_CURRENT_BUFFER(stream) \
[6cf3ca0]66        ((void *)stream->rx_ring +                              \
67         (stream->rxring_offset *                               \
[e4f27d1]68          stream->req.tp_frame_size))
[1871afc]69
[6cf3ca0]70/* Cached page size, the page size shouldn't be changing */
71static int pagesize = 0;
[1871afc]72
73/*
74 * Try figure out the best sizes for the ring buffer. Ensure that:
75 * - max(Block_size) == page_size << max_order
76 * - Frame_size == page_size << x (so that block_size%frame_size == 0)
77 *   This means that there will be no wasted space between blocks
78 * - Frame_size < block_size
79 * - Frame_size is as close as possible to LIBTRACE_PACKET_BUFSIZE, but not
80 *   bigger
81 * - Frame_nr = Block_nr * (frames per block)
82 * - CONF_RING_FRAMES is used a minimum number of frames to hold
83 * - Calculates based on max_order and buf_min
84 */
85static void calculate_buffers(struct tpacket_req * req, int fd, char * uri,
86                uint32_t max_order)
87{
88        struct ifreq ifr;
89        unsigned max_frame = LIBTRACE_PACKET_BUFSIZE;
90        pagesize = getpagesize();
91
92        strcpy(ifr.ifr_name, uri);
93        /* Don't bother trying to set frame size above mtu linux will drop
94         * these anyway.
95         *
96         * Remember, that our frame also has to include a TPACKET header!
97         */
98        if (ioctl(fd, SIOCGIFMTU, (caddr_t)&ifr) >= 0)
99                max_frame = ifr.ifr_mtu + TPACKET_ALIGN(TPACKET2_HDRLEN);
100        if (max_frame > LIBTRACE_PACKET_BUFSIZE)
101                max_frame = LIBTRACE_PACKET_BUFSIZE;
102
103        /* Calculate frame size */
104        req->tp_frame_size = pagesize;
105        while (req->tp_frame_size < max_frame &&
106              req->tp_frame_size < LIBTRACE_PACKET_BUFSIZE) {
107                req->tp_frame_size <<= 1;
108        }
109        if (req->tp_frame_size > LIBTRACE_PACKET_BUFSIZE)
110                req->tp_frame_size >>= 1;
111
112        /* Calculate block size */
113        req->tp_block_size = pagesize << max_order;
[e4f27d1]114        /* If max order is too high this might become 0 */
115        if (req->tp_block_size == 0) {
116                calculate_buffers(req, fd, uri, max_order-1);
117                return;
118        }
[1871afc]119        do {
120                req->tp_block_size >>= 1;
121        } while ((CONF_RING_FRAMES * req->tp_frame_size) <= req->tp_block_size);
122        req->tp_block_size <<= 1;
123
124        /* Calculate number of blocks */
125        req->tp_block_nr = (CONF_RING_FRAMES * req->tp_frame_size)
126                / req->tp_block_size;
127        if((CONF_RING_FRAMES * req->tp_frame_size) % req->tp_block_size != 0)
128                req->tp_block_nr++;
129
130        /* Calculate packets such that we use all the space we have to
131         * allocated */
132        req->tp_frame_nr = req->tp_block_nr *
133                (req->tp_block_size / req->tp_frame_size);
134
135        /*
136        printf("MaxO 0x%x BS 0x%x BN 0x%x FS 0x%x FN 0x%x\n",
137                max_order,
138                req->tp_block_size,
139                req->tp_block_nr,
140                req->tp_frame_size,
141                req->tp_frame_nr);
142        */
143
144        /* In case we have some silly values*/
145        assert(req->tp_block_size);
146        assert(req->tp_block_nr);
147        assert(req->tp_frame_size);
148        assert(req->tp_frame_nr);
149        assert(req->tp_block_size % req->tp_frame_size == 0);
150}
151
[6cf3ca0]152static inline int socket_to_packetmmap(char * uridata, int ring_type,
[1871afc]153                                        int fd,
154                                        struct tpacket_req * req,
155                                        char ** ring_location,
156                                        uint32_t *max_order,
157                                        char *error) {
158        int val;
159
160        /* Switch to TPACKET header version 2, we only try support v2 because
161         * v1 had problems with data type consistancy */
162        val = TPACKET_V2;
163        if (setsockopt(fd,
164                       SOL_PACKET,
165                       PACKET_VERSION,
166                       &val,
167                       sizeof(val)) == -1) {
168                strncpy(error, "TPACKET2 not supported", 2048);
169                return -1;
170        }
171
172        /* Try switch to a ring buffer. If it fails we assume the the kernel
173         * cannot allocate a block of that size, so decrease max_block and
174         * retry.
175         */
176        while(1) {
177                if (*max_order <= 0) {
178                        strncpy(error,
179                                "Cannot allocate enough memory for ring buffer",
180                                2048);
181                        return -1;
182                }
183                calculate_buffers(req, fd, uridata, *max_order);
184                if (setsockopt(fd,
185                               SOL_PACKET,
186                               ring_type,
187                               req,
188                               sizeof(struct tpacket_req)) == -1) {
189                        if(errno == ENOMEM) {
190                                (*max_order)--;
191                        } else {
192                                strncpy(error,
193                                        "Error setting the ring buffer size",
194                                        2048);
195                                return -1;
196                        }
197
198                } else break;
199        }
200
201        /* Map the ring buffer into userspace */
202        *ring_location = mmap(NULL,
203                              req->tp_block_size * req->tp_block_nr,
204                              PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
205        if(*ring_location == MAP_FAILED) {
206                strncpy(error, "Failed to map memory for ring buffer", 2048);
207                return -1;
208        }
209
210        return 0;
211}
212
213/* Release a frame back to the kernel or free() if it's a malloc'd buffer
214 */
[6cf3ca0]215inline static void ring_release_frame(libtrace_t *libtrace UNUSED,
[1871afc]216                                      libtrace_packet_t *packet)
217{
218        /* Free the old packet */
219        if(packet->buffer == NULL)
220                return;
221
222        if(packet->buf_control == TRACE_CTRL_PACKET){
223                free(packet->buffer);
224                packet->buffer = NULL;
225        }
226
227        if(packet->buf_control == TRACE_CTRL_EXTERNAL) {
[6cf3ca0]228                //struct linux_format_data_t *ftd = FORMAT_DATA;
[1871afc]229                /* Check it's within our buffer first - consider the pause
230                 * resume case it might have already been free'd lets hope we
231                 * get another buffer */
232                // TODO: For now let any one free anything
233                /*if(LIBTRACE_BETWEEN((char *) packet->buffer,
234                                (char *) ftd->rx_ring,
235                                ftd->rx_ring +
236                                ftd->req.tp_block_size *
237                                ftd->req.tp_block_nr)){*/
238                TO_TP_HDR2(packet->buffer)->tp_status = 0;
239                packet->buffer = NULL;
240                /*}*/
241        }
242}
243
[6cf3ca0]244static inline int linuxring_start_input_stream(libtrace_t *libtrace,
245                                               struct linux_per_stream_t *stream) {
[1871afc]246        char error[2048];
247
248        /* We set the socket up the same and then convert it to PACKET_MMAP */
[6cf3ca0]249        if (linuxcommon_start_input_stream(libtrace, stream) < 0)
[1871afc]250                return -1;
251
252        strncpy(error, "No known error", 2048);
253
254        /* Make it a packetmmap */
255        if(socket_to_packetmmap(libtrace->uridata, PACKET_RX_RING,
[6cf3ca0]256                                stream->fd,
[e4f27d1]257                                &stream->req,
[6cf3ca0]258                                &stream->rx_ring,
259                                &FORMAT_DATA->max_order,
260                                error) != 0) {
[1871afc]261                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
[6cf3ca0]262                              "Initialisation of packet MMAP failed: %s",
263                              error);
264                linuxcommon_close_input_stream(libtrace, stream);
[1871afc]265                return -1;
266        }
267
268        return 0;
269}
270
[6cf3ca0]271static int linuxring_start_input(libtrace_t *libtrace)
[1871afc]272{
[6cf3ca0]273        int ret = linuxring_start_input_stream(libtrace, FORMAT_DATA_FIRST);
274        return ret;
275}
[1871afc]276
[6d1c2c0]277#ifdef HAVE_PACKET_FANOUT
[6cf3ca0]278static int linuxring_pstart_input(libtrace_t *libtrace) {
279        return linuxcommon_pstart_input(libtrace, linuxring_start_input_stream);
[1871afc]280}
[6d1c2c0]281#endif
[1871afc]282
283static int linuxring_start_output(libtrace_out_t *libtrace)
284{
285        char error[2048];
[6cf3ca0]286        FORMAT_DATA_OUT->fd = socket(PF_PACKET, SOCK_RAW, 0);
287        if (FORMAT_DATA_OUT->fd==-1) {
288                free(FORMAT_DATA_OUT);
289                trace_set_err_out(libtrace, errno, "Failed to create raw socket");
[1871afc]290                return -1;
[6cf3ca0]291        }
[1871afc]292
293        /* Make it a packetmmap */
294        if(socket_to_packetmmap(libtrace->uridata, PACKET_TX_RING,
295                                FORMAT_DATA_OUT->fd,
296                                &FORMAT_DATA_OUT->req,
297                                &FORMAT_DATA_OUT->tx_ring,
298                                &FORMAT_DATA_OUT->max_order,
299                                error) != 0) {
300                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED,
301                                  "Initialisation of packet MMAP failed: %s",
302                                  error);
303                close(FORMAT_DATA_OUT->fd);
304                free(FORMAT_DATA_OUT);
305                libtrace->format_data = NULL;
306                return -1;
307        }
308
309        FORMAT_DATA_OUT->sock_hdr.sll_family = AF_PACKET;
310        FORMAT_DATA_OUT->sock_hdr.sll_protocol = 0;
311        FORMAT_DATA_OUT->sock_hdr.sll_ifindex =
312                if_nametoindex(libtrace->uridata);
313        FORMAT_DATA_OUT->sock_hdr.sll_hatype = 0;
314        FORMAT_DATA_OUT->sock_hdr.sll_pkttype = 0;
315        FORMAT_DATA_OUT->sock_hdr.sll_halen = 0;
316        FORMAT_DATA_OUT->queue = 0;
317
318        return 0;
319}
320
321static int linuxring_fin_output(libtrace_out_t *libtrace)
322{
323        /* Make sure any remaining frames get sent */
324        sendto(FORMAT_DATA_OUT->fd,
325               NULL,
326               0,
327               0,
328               (void *) &FORMAT_DATA_OUT->sock_hdr,
329               sizeof(FORMAT_DATA_OUT->sock_hdr));
330
331        /* Unmap our data area */
332        munmap(FORMAT_DATA_OUT->tx_ring,
333               FORMAT_DATA_OUT->req.tp_block_size *
334               FORMAT_DATA_OUT->req.tp_block_nr);
335
[6cf3ca0]336        /* Free the socket */
337        close(FORMAT_DATA_OUT->fd);
338        FORMAT_DATA_OUT->fd=-1;
339        free(libtrace->format_data);
340        return 0;
[1871afc]341}
[9d89626]342#endif /* HAVE_NETPACKET_PACKET_H */
[1871afc]343
[6cf3ca0]344static libtrace_linktype_t
345linuxring_get_link_type(const struct libtrace_packet_t *packet)
[1871afc]346{
[6cf3ca0]347        uint16_t linktype = GET_SOCKADDR_HDR(packet->buffer)->sll_hatype;
348        return linuxcommon_get_link_type(linktype);
[1871afc]349}
350
[6cf3ca0]351static libtrace_direction_t
352linuxring_get_direction(const struct libtrace_packet_t *packet) {
353        return linuxcommon_get_direction(GET_SOCKADDR_HDR(packet->buffer)->
354                                         sll_pkttype);
355}
356
357static libtrace_direction_t
358linuxring_set_direction(libtrace_packet_t *packet,
359                        libtrace_direction_t direction) {
360        return linuxcommon_set_direction(GET_SOCKADDR_HDR(packet->buffer), direction);
361}
362
363static struct timeval linuxring_get_timeval(const libtrace_packet_t *packet)
[1871afc]364{
[6cf3ca0]365        struct timeval tv;
366        tv.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
367        tv.tv_usec = TO_TP_HDR2(packet->buffer)->tp_nsec / 1000;
368        return tv;
[1871afc]369}
370
[6cf3ca0]371static struct timespec linuxring_get_timespec(const libtrace_packet_t *packet)
372{
373        struct timespec ts;
374        ts.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
375        ts.tv_nsec = TO_TP_HDR2(packet->buffer)->tp_nsec;
376        return ts;
377}
378
379static int linuxring_get_capture_length(const libtrace_packet_t *packet)
380{
381        return TO_TP_HDR2(packet->buffer)->tp_snaplen;
382}
383
384static int linuxring_get_wire_length(const libtrace_packet_t *packet)
385{
386        int wirelen = TO_TP_HDR2(packet->buffer)->tp_len;
387
388        /* Include the missing FCS */
389        if (trace_get_link_type(packet) == TRACE_TYPE_ETH)
390                wirelen += 4;
391
392        return wirelen;
393}
394
395static int linuxring_get_framing_length(const libtrace_packet_t *packet)
396{
397        /*
398         * Need to make frame_length + capture_length = complete capture length
399         * so include alignment whitespace. So reverse calculate from packet.
400         */
401        return (char *)packet->payload - (char *)packet->buffer;
402}
403
404static size_t linuxring_set_capture_length(libtrace_packet_t *packet,
405                                           size_t size)
406{
407        assert(packet);
408        if (size > trace_get_capture_length(packet)) {
409                /* We should avoid making a packet larger */
410                return trace_get_capture_length(packet);
411        }
412
413        /* Reset the cached capture length */
414        packet->capture_length = -1;
415
416        TO_TP_HDR2(packet->buffer)->tp_snaplen = size;
417
418        return trace_get_capture_length(packet);
419}
420
421static int linuxring_prepare_packet(libtrace_t *libtrace UNUSED,
422                                    libtrace_packet_t *packet, void *buffer,
423                                    libtrace_rt_types_t rt_type, uint32_t flags)
424{
425        if (packet->buffer != buffer &&
426            packet->buf_control == TRACE_CTRL_PACKET) {
427                free(packet->buffer);
428        }
429
430        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
431                packet->buf_control = TRACE_CTRL_PACKET;
432        else
433                packet->buf_control = TRACE_CTRL_EXTERNAL;
434
435
436        packet->buffer = buffer;
437        packet->header = buffer;
438        packet->payload = (char *)buffer +
439                TP_TRACE_START(TO_TP_HDR2(packet->header)->tp_mac,
440                               TO_TP_HDR2(packet->header)->tp_net,
441                               TPACKET2_HDRLEN);
442        packet->type = rt_type;
443
444        return 0;
445}
[9d89626]446
447#ifdef HAVE_NETPACKET_PACKET_H
[6cf3ca0]448#define LIBTRACE_MIN(a,b) ((a)<(b) ? (a) : (b))
449inline static int linuxring_read_stream(libtrace_t *libtrace,
450                                        libtrace_packet_t *packet,
451                                        struct linux_per_stream_t *stream,
452                                        libtrace_message_queue_t *queue) {
[1871afc]453
454        struct tpacket2_hdr *header;
455        int ret;
456        unsigned int snaplen;
[6cf3ca0]457        struct pollfd pollset[2];
458
[1871afc]459        ring_release_frame(libtrace, packet);
460       
461        packet->buf_control = TRACE_CTRL_EXTERNAL;
462        packet->type = TRACE_RT_DATA_LINUX_RING;
463       
464        /* Fetch the current frame */
[e4f27d1]465        header = GET_CURRENT_BUFFER(stream);
[1871afc]466        assert((((unsigned long) header) & (pagesize - 1)) == 0);
467
468        /* TP_STATUS_USER means that we can use the frame.
469         * When a slot does not have this flag set, the frame is not
470         * ready for consumption.
471         */
472        while (!(header->tp_status & TP_STATUS_USER)) {
[6cf3ca0]473                pollset[0].fd = stream->fd;
474                pollset[0].events = POLLIN;
475                pollset[0].revents = 0;
476                if (queue) {
477                        pollset[1].fd = libtrace_message_queue_get_fd(queue);
[1871afc]478                        pollset[1].events = POLLIN;
479                        pollset[1].revents = 0;
[6cf3ca0]480                }
[1ebc4bd]481                /* Wait for more data or a message */
482                ret = poll(pollset, (queue ? 2 : 1), 500);
[6cf3ca0]483                if (ret > 0) {
[1ebc4bd]484                        if (pollset[0].revents == POLLIN)
[1871afc]485                                continue;
[1ebc4bd]486                        else if (queue && pollset[1].revents == POLLIN)
[6cf3ca0]487                                return READ_MESSAGE;
[1ebc4bd]488                        else if (queue && pollset[1].revents) {
489                                /* Internal error */
490                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
491                                              "Message queue error %d poll()",
492                                              pollset[1].revents);
493                                return READ_ERROR;
494                        } else {
[3e89670]495                                /* Try get the error from the socket */
496                                int err = ENETDOWN;
497                                socklen_t len = sizeof(err);
498                                getsockopt(stream->fd, SOL_SOCKET, SO_ERROR,
499                                           &err, &len);
500                                trace_set_err(libtrace, err,
[1ebc4bd]501                                              "Socket error revents=%d poll()",
502                                              pollset[0].revents);
503                                return READ_ERROR;
504                        }
[6cf3ca0]505                } else if (ret < 0) {
[1ebc4bd]506                        if (errno != EINTR) {
507                                trace_set_err(libtrace,errno,"poll()");
508                                return -1;
509                        }
[6cf3ca0]510                } else {
511                        /* Poll timed out - check if we should exit */
[5e3f16c]512                        if ((ret=is_halted(libtrace)) != -1)
513                                return ret;
[6cf3ca0]514                        continue;
[1871afc]515                }
516        }
517
518        packet->buffer = header;
[10c47a0]519        packet->trace = libtrace;
[1871afc]520
521        /* If a snaplen was configured, automatically truncate the packet to
522         * the desired length.
523         */
524        snaplen=LIBTRACE_MIN(
525                        (int)LIBTRACE_PACKET_BUFSIZE-(int)sizeof(*header),
[6cf3ca0]526                        (int)FORMAT_DATA->snaplen);
[1871afc]527       
528        TO_TP_HDR2(packet->buffer)->tp_snaplen = LIBTRACE_MIN((unsigned int)snaplen, TO_TP_HDR2(packet->buffer)->tp_len);
529
530        /* Move to next buffer */
[6cf3ca0]531        stream->rxring_offset++;
[e4f27d1]532        stream->rxring_offset %= stream->req.tp_frame_nr;
[1871afc]533
534        /* We just need to get prepare_packet to set all our packet pointers
535         * appropriately */
536        if (linuxring_prepare_packet(libtrace, packet, packet->buffer,
537                                packet->type, 0))
538                return -1;
539        return  linuxring_get_framing_length(packet) + 
540                                linuxring_get_capture_length(packet);
541
542}
543
544static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
[6cf3ca0]545        return linuxring_read_stream(libtrace, packet, FORMAT_DATA_FIRST, NULL);
[1871afc]546}
547
[6d1c2c0]548#ifdef HAVE_PACKET_FANOUT
[1871afc]549static int linuxring_pread_packets(libtrace_t *libtrace,
550                                   libtrace_thread_t *t,
[6cf3ca0]551                                   libtrace_packet_t *packets[],
[1871afc]552                                   UNUSED size_t nb_packets) {
[6cf3ca0]553        /* For now just read one packet */
554        packets[0]->error = linuxring_read_stream(libtrace, packets[0],
555                                                  t->format_data, &t->messages);
[1871afc]556        if (packets[0]->error >= 1)
557                return 1;
558        else
559                return packets[0]->error;
560}
[6d1c2c0]561#endif
[1871afc]562
563/* Non-blocking read */
564static libtrace_eventobj_t linuxring_event(libtrace_t *libtrace,
565                                           libtrace_packet_t *packet)
566{
567        struct tpacket2_hdr *header;
568        libtrace_eventobj_t event = {0,0,0.0,0};
569
570        /* We must free the old packet, otherwise select() will instantly
571         * return */
572        ring_release_frame(libtrace, packet);
573
574        /* Fetch the current frame */
[e4f27d1]575        header = GET_CURRENT_BUFFER(FORMAT_DATA_FIRST);
[1871afc]576        if (header->tp_status & TP_STATUS_USER) {
577                /* We have a frame waiting */
578                event.size = trace_read_packet(libtrace, packet);
579                event.type = TRACE_EVENT_PACKET;
580        } else {
581                /* Ok we don't have a packet waiting */
582                event.type = TRACE_EVENT_IOWAIT;
583                event.fd = FORMAT_DATA_FIRST->fd;
584        }
585
586        return event;
587}
588
589/**
590 * Free any resources being kept for this packet, Note: libtrace
591 * will ensure all fields are zeroed correctly.
592 */
593static void linuxring_fin_packet(libtrace_packet_t *packet)
594{
595        libtrace_t *libtrace = packet->trace;
596
597        if (packet->buffer == NULL)
598                return;
599        assert(packet->trace);
600
601        /* If we own the packet (i.e. it's not a copy), we need to free it */
602        if (packet->buf_control == TRACE_CTRL_EXTERNAL) {
[6cf3ca0]603                /* Started should always match the existence of the rx_ring
604                 * in the parallel case still just check the first ring */
[1871afc]605                assert(!!FORMAT_DATA_FIRST->rx_ring ==
606                       !!packet->trace->started);
607                /* If we don't have a ring its already been destroyed */
[6cf3ca0]608                if (FORMAT_DATA_FIRST->rx_ring != MAP_FAILED)
[1871afc]609                        ring_release_frame(packet->trace, packet);
610                else
611                        packet->buffer = NULL;
612        }
613}
614
615static int linuxring_write_packet(libtrace_out_t *libtrace,
616                                  libtrace_packet_t *packet)
617{
618        struct tpacket2_hdr *header;
619        struct pollfd pollset;
620        struct socket_addr;
621        int ret;
622        unsigned max_size;
623        void * off;
624
625        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
626                return 0;
627
628        max_size = FORMAT_DATA_OUT->req.tp_frame_size -
629                TPACKET2_HDRLEN + sizeof(struct sockaddr_ll);
630
631        header = (void *)FORMAT_DATA_OUT->tx_ring +
632                (FORMAT_DATA_OUT->txring_offset *
633                 FORMAT_DATA_OUT->req.tp_frame_size);
634
635        while(header->tp_status != TP_STATUS_AVAILABLE) {
636                /* if none available: wait on more data */
637                pollset.fd = FORMAT_DATA_OUT->fd;
638                pollset.events = POLLOUT;
639                pollset.revents = 0;
640                ret = poll(&pollset, 1, 1000);
641                if (ret < 0 && errno != EINTR) {
642                        perror("poll");
643                        return -1;
644                }
[000726a]645                if(ret == 0) {
[1871afc]646                        /* Timeout something has gone wrong - maybe the queue is
647                         * to large so try issue another send command
648                         */
649                        ret = sendto(FORMAT_DATA_OUT->fd,
650                                     NULL,
651                                     0,
652                                     0,
653                                     (void *)&FORMAT_DATA_OUT->sock_hdr,
654                                     sizeof(FORMAT_DATA_OUT->sock_hdr));
655                        if (ret < 0) {
656                                trace_set_err_out(libtrace, errno,
657                                                  "sendto after timeout "
658                                                  "failed");
659                                return -1;
660                        }
[000726a]661                }
[1871afc]662        }
663
664        header->tp_len = trace_get_capture_length(packet);
665
666        /* We cannot write the whole packet so just write part of it */
667        if (header->tp_len > max_size)
668                header->tp_len = max_size;
669
670        /* Fill packet - no sockaddr_ll in header when writing to the TX_RING */
671        off = ((void *)header) + (TPACKET2_HDRLEN - sizeof(struct sockaddr_ll));
672        memcpy(off, (char *)packet->payload, header->tp_len);
673
674        /* 'Send it' and increase ring pointer to the next frame */
675        header->tp_status = TP_STATUS_SEND_REQUEST;
676        FORMAT_DATA_OUT->txring_offset = (FORMAT_DATA_OUT->txring_offset + 1) %
677                FORMAT_DATA_OUT->req.tp_frame_nr;
678
679        /* Notify kernel there are frames to send */
680        FORMAT_DATA_OUT->queue ++;
681        FORMAT_DATA_OUT->queue %= TX_MAX_QUEUE;
682        if(FORMAT_DATA_OUT->queue == 0){
683                ret = sendto(FORMAT_DATA_OUT->fd,
684                                NULL,
685                                0,
686                                MSG_DONTWAIT,
687                                (void *)&FORMAT_DATA_OUT->sock_hdr,
688                                sizeof(FORMAT_DATA_OUT->sock_hdr));
689                if (ret < 0) {
690                        trace_set_err_out(libtrace, errno, "sendto failed");
691                        return -1;
692                }
693        }
694        return header->tp_len;
695
696}
697
698static void linuxring_help(void)
699{
700        printf("linuxring format module: $Revision: 1793 $\n");
701        printf("Supported input URIs:\n");
702        printf("\tring:eth0\n");
703        printf("\n");
704        printf("Supported output URIs:\n");
705        printf("\tring:eth0\n");
706        printf("\n");
707        return;
708}
709
710static struct libtrace_format_t linuxring = {
711        "ring",
712        "$Id$",
713        TRACE_FORMAT_LINUX_RING,
[6cf3ca0]714        linuxcommon_probe_filename,     /* probe filename */
[1871afc]715        NULL,                           /* probe magic */
[6cf3ca0]716        linuxcommon_init_input,         /* init_input */
717        linuxcommon_config_input,       /* config_input */
[1871afc]718        linuxring_start_input,          /* start_input */
[6cf3ca0]719        linuxcommon_pause_input,        /* pause_input */
720        linuxcommon_init_output,        /* init_output */
[1871afc]721        NULL,                           /* config_output */
722        linuxring_start_output,         /* start_ouput */
[6cf3ca0]723        linuxcommon_fin_input,          /* fin_input */
[1871afc]724        linuxring_fin_output,           /* fin_output */
725        linuxring_read_packet,          /* read_packet */
726        linuxring_prepare_packet,       /* prepare_packet */
727        linuxring_fin_packet,           /* fin_packet */
728        linuxring_write_packet,         /* write_packet */
729        linuxring_get_link_type,        /* get_link_type */
730        linuxring_get_direction,        /* get_direction */
731        linuxring_set_direction,        /* set_direction */
732        NULL,                           /* get_erf_timestamp */
733        linuxring_get_timeval,          /* get_timeval */
734        linuxring_get_timespec,         /* get_timespec */
735        NULL,                           /* get_seconds */
736        NULL,                           /* seek_erf */
737        NULL,                           /* seek_timeval */
738        NULL,                           /* seek_seconds */
739        linuxring_get_capture_length,   /* get_capture_length */
740        linuxring_get_wire_length,      /* get_wire_length */
741        linuxring_get_framing_length,   /* get_framing_length */
742        linuxring_set_capture_length,   /* set_capture_length */
743        NULL,                           /* get_received_packets */
[5ab626a]744        NULL,                           /* get_filtered_packets */
745        NULL,                           /* get_dropped_packets */
746        linuxcommon_get_statistics,     /* get_statistics */
[6cf3ca0]747        linuxcommon_get_fd,             /* get_fd */
[1871afc]748        linuxring_event,                /* trace_event */
[6cf3ca0]749        linuxring_help,                 /* help */
[1871afc]750        NULL,                           /* next pointer */
[babeb70]751#ifdef HAVE_PACKET_FANOUT
[1871afc]752        {true, -1},                     /* Live, no thread limit */
[6cf3ca0]753        linuxring_pstart_input,         /* pstart_input */
[1871afc]754        linuxring_pread_packets,        /* pread_packets */
[6cf3ca0]755        linuxcommon_pause_input,        /* ppause */
756        linuxcommon_fin_input,          /* p_fin */
[5ab626a]757        linuxcommon_pregister_thread,   /* register thread */
758        NULL,                           /* unregister thread */
759        NULL                            /* get thread stats */
[babeb70]760#else
761        NON_PARALLEL(true)
762#endif
[1871afc]763};
[9d89626]764#else /* HAVE_NETPACKET_PACKET_H */
[1871afc]765
766static void linuxring_help(void)
767{
768        printf("linuxring format module: $Revision: 1793 $\n");
769        printf("Not supported on this host\n");
770}
771
772static struct libtrace_format_t linuxring = {
773        "ring",
774        "$Id$",
775        TRACE_FORMAT_LINUX_RING,
776        NULL,                           /* probe filename */
777        NULL,                           /* probe magic */
778        NULL,                           /* init_input */
779        NULL,                           /* config_input */
780        NULL,                           /* start_input */
781        NULL,                           /* pause_input */
782        NULL,                           /* init_output */
783        NULL,                           /* config_output */
784        NULL,                           /* start_ouput */
785        NULL,                           /* fin_input */
786        NULL,                           /* fin_output */
787        NULL,                           /* read_packet */
788        linuxring_prepare_packet,       /* prepare_packet */
789        NULL,                           /* fin_packet */
790        NULL,                           /* write_packet */
791        linuxring_get_link_type,        /* get_link_type */
792        linuxring_get_direction,        /* get_direction */
793        linuxring_set_direction,        /* set_direction */
794        NULL,                           /* get_erf_timestamp */
795        linuxring_get_timeval,          /* get_timeval */
796        linuxring_get_timespec,         /* get_timespec */
797        NULL,                           /* get_seconds */
798        NULL,                           /* seek_erf */
799        NULL,                           /* seek_timeval */
800        NULL,                           /* seek_seconds */
801        linuxring_get_capture_length,   /* get_capture_length */
802        linuxring_get_wire_length,      /* get_wire_length */
803        linuxring_get_framing_length,   /* get_framing_length */
804        linuxring_set_capture_length,   /* set_capture_length */
805        NULL,                           /* get_received_packets */
806        NULL,                           /* get_filtered_packets */
807        NULL,                           /* get_dropped_packets */
[5ab626a]808        linuxcommon_get_statistics,     /* get_statistics */
[1871afc]809        NULL,                           /* get_fd */
810        NULL,                           /* trace_event */
811        linuxring_help,                 /* help */
812        NULL,                           /* next pointer */
813        NON_PARALLEL(true)
814};
[9d89626]815#endif /* HAVE_NETPACKET_PACKET_H */
[1871afc]816
817/* TODO: Figure out how to give this format preference over the linux native
818 * formate if the user only specifies an interface */
819void linuxring_constructor(void)
820{
821        register_format(&linuxring);
822}
Note: See TracBrowser for help on using the repository browser.