source: lib/format_linux_ring.c @ 8a63abd

cachetimestampsdeveloprc-4.0.4ringdecrementfixringperformance
Last change on this file since 8a63abd was 8a63abd, checked in by Shane Alcock <salcock@…>, 2 years ago

Fix build issue on FreeBSD due to global declared in bad place.

Global mutex for the pagesize was declared inside some Linux
only code, but used in code for all systems.

  • Property mode set to 100644
File size: 24.5 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27/* This format module deals with using the Linux Ring capture format (also
28 * known as PACKET_MMAP).
29 *
30 * Linux Ring is a LIVE capture format.
31 *
32 * This format also supports writing which will write packets out to the
33 * network as a form of packet replay. This should not be confused with the
34 * RT protocol which is intended to transfer captured packet records between
35 * RT-speaking programs.
36 */
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include <stdlib.h>
44#include <errno.h>
45#include <unistd.h>
46#include <string.h>
47#include <assert.h>
48
49#ifdef HAVE_INTTYPES_H
50#  include <inttypes.h>
51#else
52# error "Can't find inttypes.h"
53#endif
54
55#include "format_linux_common.h"
56
57/* Get the start of the captured data. I'm not sure if tp_mac (link layer) is
58 * always guaranteed. If it's not there then just use tp_net.
59 */
60#define TP_TRACE_START(mac, net, hdrend) \
61        ((mac) > (hdrend) && (mac) < (net) ? (mac) : (net))
62
63static pthread_mutex_t pagesize_mutex;
64#ifdef HAVE_NETPACKET_PACKET_H
65/* Get current frame in the ring buffer*/
66#define GET_CURRENT_BUFFER(stream) \
67        ((void *)stream->rx_ring +                              \
68         (stream->rxring_offset *                               \
69          stream->req.tp_frame_size))
70
71/* Cached page size, the page size shouldn't be changing */
72static int pagesize = 0;
73
74
75/*
76 * Try figure out the best sizes for the ring buffer. Ensure that:
77 * - max(Block_size) == page_size << max_order
78 * - Frame_size == page_size << x (so that block_size%frame_size == 0)
79 *   This means that there will be no wasted space between blocks
80 * - Frame_size < block_size
81 * - Frame_size is as close as possible to LIBTRACE_PACKET_BUFSIZE, but not
82 *   bigger
83 * - Frame_nr = Block_nr * (frames per block)
84 * - CONF_RING_FRAMES is used a minimum number of frames to hold
85 * - Calculates based on max_order and buf_min
86 */
87static void calculate_buffers(struct tpacket_req * req, int fd, char * uri,
88                uint32_t max_order)
89{
90        struct ifreq ifr;
91        unsigned max_frame = LIBTRACE_PACKET_BUFSIZE;
92        pthread_mutex_lock(&pagesize_mutex);
93        if (pagesize == 0) {
94                pagesize = getpagesize();
95        }
96        pthread_mutex_unlock(&pagesize_mutex);
97
98        strcpy(ifr.ifr_name, uri);
99        /* Don't bother trying to set frame size above mtu linux will drop
100         * these anyway.
101         *
102         * Remember, that our frame also has to include a TPACKET header!
103         */
104        if (ioctl(fd, SIOCGIFMTU, (caddr_t)&ifr) >= 0)
105                max_frame = ifr.ifr_mtu + TPACKET_ALIGN(TPACKET2_HDRLEN);
106        if (max_frame > LIBTRACE_PACKET_BUFSIZE)
107                max_frame = LIBTRACE_PACKET_BUFSIZE;
108
109        /* Calculate frame size */
110        req->tp_frame_size = pagesize;
111        while (req->tp_frame_size < max_frame &&
112              req->tp_frame_size < LIBTRACE_PACKET_BUFSIZE) {
113                req->tp_frame_size <<= 1;
114        }
115        if (req->tp_frame_size > LIBTRACE_PACKET_BUFSIZE)
116                req->tp_frame_size >>= 1;
117
118        /* Calculate block size */
119        req->tp_block_size = pagesize << max_order;
120        /* If max order is too high this might become 0 */
121        if (req->tp_block_size == 0) {
122                calculate_buffers(req, fd, uri, max_order-1);
123                return;
124        }
125        do {
126                req->tp_block_size >>= 1;
127        } while ((CONF_RING_FRAMES * req->tp_frame_size) <= req->tp_block_size);
128        req->tp_block_size <<= 1;
129
130        /* Calculate number of blocks */
131        req->tp_block_nr = (CONF_RING_FRAMES * req->tp_frame_size)
132                / req->tp_block_size;
133        if((CONF_RING_FRAMES * req->tp_frame_size) % req->tp_block_size != 0)
134                req->tp_block_nr++;
135
136        /* Calculate packets such that we use all the space we have to
137         * allocated */
138        req->tp_frame_nr = req->tp_block_nr *
139                (req->tp_block_size / req->tp_frame_size);
140
141        /*
142        printf("MaxO 0x%x BS 0x%x BN 0x%x FS 0x%x FN 0x%x\n",
143                max_order,
144                req->tp_block_size,
145                req->tp_block_nr,
146                req->tp_frame_size,
147                req->tp_frame_nr);
148        */
149
150        /* In case we have some silly values*/
151        assert(req->tp_block_size);
152        assert(req->tp_block_nr);
153        assert(req->tp_frame_size);
154        assert(req->tp_frame_nr);
155        assert(req->tp_block_size % req->tp_frame_size == 0);
156}
157
158static inline int socket_to_packetmmap(char * uridata, int ring_type,
159                                        int fd,
160                                        struct tpacket_req * req,
161                                        char ** ring_location,
162                                        uint32_t *max_order,
163                                        char *error) {
164        int val;
165
166        /* Switch to TPACKET header version 2, we only try support v2 because
167         * v1 had problems with data type consistancy */
168        val = TPACKET_V2;
169        if (setsockopt(fd,
170                       SOL_PACKET,
171                       PACKET_VERSION,
172                       &val,
173                       sizeof(val)) == -1) {
174                strncpy(error, "TPACKET2 not supported", 2048);
175                return -1;
176        }
177
178        /* Try switch to a ring buffer. If it fails we assume the the kernel
179         * cannot allocate a block of that size, so decrease max_block and
180         * retry.
181         */
182        while(1) {
183                if (*max_order <= 0) {
184                        strncpy(error,
185                                "Cannot allocate enough memory for ring buffer",
186                                2048);
187                        return -1;
188                }
189                calculate_buffers(req, fd, uridata, *max_order);
190                if (setsockopt(fd,
191                               SOL_PACKET,
192                               ring_type,
193                               req,
194                               sizeof(struct tpacket_req)) == -1) {
195                        if(errno == ENOMEM) {
196                                (*max_order)--;
197                        } else {
198                                strncpy(error,
199                                        "Error setting the ring buffer size",
200                                        2048);
201                                return -1;
202                        }
203
204                } else break;
205        }
206
207        /* Map the ring buffer into userspace */
208        *ring_location = mmap(NULL,
209                              req->tp_block_size * req->tp_block_nr,
210                              PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
211        if(*ring_location == MAP_FAILED) {
212                strncpy(error, "Failed to map memory for ring buffer", 2048);
213                return -1;
214        }
215
216        return 0;
217}
218
219/* Release a frame back to the kernel or free() if it's a malloc'd buffer
220 */
221inline static void ring_release_frame(libtrace_t *libtrace UNUSED,
222                                      libtrace_packet_t *packet)
223{
224        /* Free the old packet */
225        if(packet->buffer == NULL)
226                return;
227
228        if(packet->buf_control == TRACE_CTRL_PACKET){
229                free(packet->buffer);
230                packet->buffer = NULL;
231        }
232
233        if(packet->buf_control == TRACE_CTRL_EXTERNAL) {
234                //struct linux_format_data_t *ftd = FORMAT_DATA;
235                /* Check it's within our buffer first - consider the pause
236                 * resume case it might have already been free'd lets hope we
237                 * get another buffer */
238                // TODO: For now let any one free anything
239                /*if(LIBTRACE_BETWEEN((char *) packet->buffer,
240                                (char *) ftd->rx_ring,
241                                ftd->rx_ring +
242                                ftd->req.tp_block_size *
243                                ftd->req.tp_block_nr)){*/
244                TO_TP_HDR2(packet->buffer)->tp_status = 0;
245                packet->buffer = NULL;
246                /*}*/
247        }
248}
249
250static inline int linuxring_start_input_stream(libtrace_t *libtrace,
251                                               struct linux_per_stream_t *stream) {
252        char error[2048];
253
254        /* We set the socket up the same and then convert it to PACKET_MMAP */
255        if (linuxcommon_start_input_stream(libtrace, stream) < 0)
256                return -1;
257
258        strncpy(error, "No known error", 2048);
259
260        /* Make it a packetmmap */
261        if(socket_to_packetmmap(libtrace->uridata, PACKET_RX_RING,
262                                stream->fd,
263                                &stream->req,
264                                &stream->rx_ring,
265                                &FORMAT_DATA->max_order,
266                                error) != 0) {
267                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
268                              "Initialisation of packet MMAP failed: %s",
269                              error);
270                linuxcommon_close_input_stream(libtrace, stream);
271                return -1;
272        }
273
274        return 0;
275}
276
277static int linuxring_start_input(libtrace_t *libtrace)
278{
279        int ret = linuxring_start_input_stream(libtrace, FORMAT_DATA_FIRST);
280        return ret;
281}
282
283#ifdef HAVE_PACKET_FANOUT
284static int linuxring_pstart_input(libtrace_t *libtrace) {
285        return linuxcommon_pstart_input(libtrace, linuxring_start_input_stream);
286}
287#endif
288
289static int linuxring_start_output(libtrace_out_t *libtrace)
290{
291        char error[2048];
292        FORMAT_DATA_OUT->fd = socket(PF_PACKET, SOCK_RAW, 0);
293        if (FORMAT_DATA_OUT->fd==-1) {
294                free(FORMAT_DATA_OUT);
295                trace_set_err_out(libtrace, errno, "Failed to create raw socket");
296                return -1;
297        }
298
299        /* Make it a packetmmap */
300        if(socket_to_packetmmap(libtrace->uridata, PACKET_TX_RING,
301                                FORMAT_DATA_OUT->fd,
302                                &FORMAT_DATA_OUT->req,
303                                &FORMAT_DATA_OUT->tx_ring,
304                                &FORMAT_DATA_OUT->max_order,
305                                error) != 0) {
306                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED,
307                                  "Initialisation of packet MMAP failed: %s",
308                                  error);
309                close(FORMAT_DATA_OUT->fd);
310                free(FORMAT_DATA_OUT);
311                libtrace->format_data = NULL;
312                return -1;
313        }
314
315        FORMAT_DATA_OUT->sock_hdr.sll_family = AF_PACKET;
316        FORMAT_DATA_OUT->sock_hdr.sll_protocol = 0;
317        FORMAT_DATA_OUT->sock_hdr.sll_ifindex =
318                if_nametoindex(libtrace->uridata);
319        FORMAT_DATA_OUT->sock_hdr.sll_hatype = 0;
320        FORMAT_DATA_OUT->sock_hdr.sll_pkttype = 0;
321        FORMAT_DATA_OUT->sock_hdr.sll_halen = 0;
322        FORMAT_DATA_OUT->queue = 0;
323
324        return 0;
325}
326
327static int linuxring_fin_output(libtrace_out_t *libtrace)
328{
329        /* Make sure any remaining frames get sent */
330        sendto(FORMAT_DATA_OUT->fd,
331               NULL,
332               0,
333               0,
334               (void *) &FORMAT_DATA_OUT->sock_hdr,
335               sizeof(FORMAT_DATA_OUT->sock_hdr));
336
337        /* Unmap our data area */
338        munmap(FORMAT_DATA_OUT->tx_ring,
339               FORMAT_DATA_OUT->req.tp_block_size *
340               FORMAT_DATA_OUT->req.tp_block_nr);
341
342        /* Free the socket */
343        close(FORMAT_DATA_OUT->fd);
344        FORMAT_DATA_OUT->fd=-1;
345        free(libtrace->format_data);
346        return 0;
347}
348#endif /* HAVE_NETPACKET_PACKET_H */
349
350static libtrace_linktype_t
351linuxring_get_link_type(const struct libtrace_packet_t *packet)
352{
353        uint16_t linktype = GET_SOCKADDR_HDR(packet->buffer)->sll_hatype;
354        return linuxcommon_get_link_type(linktype);
355}
356
357static libtrace_direction_t
358linuxring_get_direction(const struct libtrace_packet_t *packet) {
359        return linuxcommon_get_direction(GET_SOCKADDR_HDR(packet->buffer)->
360                                         sll_pkttype);
361}
362
363static libtrace_direction_t
364linuxring_set_direction(libtrace_packet_t *packet,
365                        libtrace_direction_t direction) {
366        return linuxcommon_set_direction(GET_SOCKADDR_HDR(packet->buffer), direction);
367}
368
369static struct timeval linuxring_get_timeval(const libtrace_packet_t *packet)
370{
371        struct timeval tv;
372        tv.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
373        tv.tv_usec = TO_TP_HDR2(packet->buffer)->tp_nsec / 1000;
374        return tv;
375}
376
377static struct timespec linuxring_get_timespec(const libtrace_packet_t *packet)
378{
379        struct timespec ts;
380        ts.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
381        ts.tv_nsec = TO_TP_HDR2(packet->buffer)->tp_nsec;
382        return ts;
383}
384
385static int linuxring_get_capture_length(const libtrace_packet_t *packet)
386{
387        return TO_TP_HDR2(packet->buffer)->tp_snaplen;
388}
389
390static int linuxring_get_wire_length(const libtrace_packet_t *packet)
391{
392        int wirelen = TO_TP_HDR2(packet->buffer)->tp_len;
393
394        /* Include the missing FCS */
395        if (trace_get_link_type(packet) == TRACE_TYPE_ETH)
396                wirelen += 4;
397
398        return wirelen;
399}
400
401static int linuxring_get_framing_length(const libtrace_packet_t *packet)
402{
403        /*
404         * Need to make frame_length + capture_length = complete capture length
405         * so include alignment whitespace. So reverse calculate from packet.
406         */
407        return (char *)packet->payload - (char *)packet->buffer;
408}
409
410static size_t linuxring_set_capture_length(libtrace_packet_t *packet,
411                                           size_t size)
412{
413        assert(packet);
414        if (size > trace_get_capture_length(packet)) {
415                /* We should avoid making a packet larger */
416                return trace_get_capture_length(packet);
417        }
418
419        /* Reset the cached capture length */
420        packet->capture_length = -1;
421
422        TO_TP_HDR2(packet->buffer)->tp_snaplen = size;
423
424        return trace_get_capture_length(packet);
425}
426
427static int linuxring_prepare_packet(libtrace_t *libtrace UNUSED,
428                                    libtrace_packet_t *packet, void *buffer,
429                                    libtrace_rt_types_t rt_type, uint32_t flags)
430{
431        if (packet->buffer != buffer &&
432            packet->buf_control == TRACE_CTRL_PACKET) {
433                free(packet->buffer);
434        }
435
436        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
437                packet->buf_control = TRACE_CTRL_PACKET;
438        else
439                packet->buf_control = TRACE_CTRL_EXTERNAL;
440
441
442        packet->buffer = buffer;
443        packet->header = buffer;
444        packet->payload = (char *)buffer +
445                TP_TRACE_START(TO_TP_HDR2(packet->header)->tp_mac,
446                               TO_TP_HDR2(packet->header)->tp_net,
447                               TPACKET2_HDRLEN);
448        packet->type = rt_type;
449
450        return 0;
451}
452
453#ifdef HAVE_NETPACKET_PACKET_H
454#define LIBTRACE_MIN(a,b) ((a)<(b) ? (a) : (b))
455/* We use TP_STATUS_LIBTRACE to ensure we don't loop back on ourself
456 * and read the same packet twice if an old packet has not yet been freed */
457#define TP_STATUS_LIBTRACE 0xFFFFFFFF
458
459inline static int linuxring_read_stream(libtrace_t *libtrace,
460                                        libtrace_packet_t *packet,
461                                        struct linux_per_stream_t *stream,
462                                        libtrace_message_queue_t *queue) {
463
464        struct tpacket2_hdr *header;
465        int ret;
466        unsigned int snaplen;
467        struct pollfd pollset[2];
468
469        packet->buf_control = TRACE_CTRL_EXTERNAL;
470        packet->type = TRACE_RT_DATA_LINUX_RING;
471       
472        /* Fetch the current frame */
473        header = GET_CURRENT_BUFFER(stream);
474        assert((((unsigned long) header) & (pagesize - 1)) == 0);
475
476        /* TP_STATUS_USER means that we can use the frame.
477         * When a slot does not have this flag set, the frame is not
478         * ready for consumption.
479         */
480        while (!(header->tp_status & TP_STATUS_USER) ||
481               header->tp_status == TP_STATUS_LIBTRACE) {
482                if ((ret=is_halted(libtrace)) != -1)
483                        return ret;
484                pollset[0].fd = stream->fd;
485                pollset[0].events = POLLIN;
486                pollset[0].revents = 0;
487                if (queue) {
488                        pollset[1].fd = libtrace_message_queue_get_fd(queue);
489                        pollset[1].events = POLLIN;
490                        pollset[1].revents = 0;
491                }
492                /* Wait for more data or a message */
493                ret = poll(pollset, (queue ? 2 : 1), 500);
494                if (ret > 0) {
495                        if (pollset[0].revents == POLLIN)
496                                continue;
497                        else if (queue && pollset[1].revents == POLLIN)
498                                return READ_MESSAGE;
499                        else if (queue && pollset[1].revents) {
500                                /* Internal error */
501                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
502                                              "Message queue error %d poll()",
503                                              pollset[1].revents);
504                                return READ_ERROR;
505                        } else {
506                                /* Try get the error from the socket */
507                                int err = ENETDOWN;
508                                socklen_t len = sizeof(err);
509                                getsockopt(stream->fd, SOL_SOCKET, SO_ERROR,
510                                           &err, &len);
511                                trace_set_err(libtrace, err,
512                                              "Socket error revents=%d poll()",
513                                              pollset[0].revents);
514                                return READ_ERROR;
515                        }
516                } else if (ret < 0) {
517                        if (errno != EINTR) {
518                                trace_set_err(libtrace,errno,"poll()");
519                                return -1;
520                        }
521                } else {
522                        /* Poll timed out - check if we should exit on next loop */
523                        continue;
524                }
525        }
526        packet->buffer = header;
527        packet->trace = libtrace;
528       
529        header->tp_status = TP_STATUS_LIBTRACE;
530
531        /* If a snaplen was configured, automatically truncate the packet to
532         * the desired length.
533         */
534        snaplen=LIBTRACE_MIN(
535                        (int)LIBTRACE_PACKET_BUFSIZE-(int)sizeof(*header),
536                        (int)FORMAT_DATA->snaplen);
537       
538        TO_TP_HDR2(packet->buffer)->tp_snaplen = LIBTRACE_MIN((unsigned int)snaplen, TO_TP_HDR2(packet->buffer)->tp_len);
539
540        /* Move to next buffer */
541        stream->rxring_offset++;
542        stream->rxring_offset %= stream->req.tp_frame_nr;
543
544        packet->order = (((uint64_t)TO_TP_HDR2(packet->buffer)->tp_sec) << 32)
545                        + ((((uint64_t)TO_TP_HDR2(packet->buffer)->tp_nsec)
546                        << 32) / 1000000000);
547
548        if (packet->order <= stream->last_timestamp) {
549                packet->order = stream->last_timestamp + 1;
550        }
551
552        stream->last_timestamp = packet->order;
553
554        /* We just need to get prepare_packet to set all our packet pointers
555         * appropriately */
556        if (linuxring_prepare_packet(libtrace, packet, packet->buffer,
557                                packet->type, 0))
558                return -1;
559        return  linuxring_get_framing_length(packet) + 
560                                linuxring_get_capture_length(packet);
561
562}
563
564static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
565        return linuxring_read_stream(libtrace, packet, FORMAT_DATA_FIRST, NULL);
566}
567
568#ifdef HAVE_PACKET_FANOUT
569static int linuxring_pread_packets(libtrace_t *libtrace,
570                                   libtrace_thread_t *t,
571                                   libtrace_packet_t *packets[],
572                                   UNUSED size_t nb_packets) {
573        /* For now just read one packet */
574        packets[0]->error = linuxring_read_stream(libtrace, packets[0],
575                                                  t->format_data, &t->messages);
576        if (packets[0]->error >= 1)
577                return 1;
578        else
579                return packets[0]->error;
580}
581#endif
582
583/* Non-blocking read */
584static libtrace_eventobj_t linuxring_event(libtrace_t *libtrace,
585                                           libtrace_packet_t *packet)
586{
587        struct tpacket2_hdr *header;
588        libtrace_eventobj_t event = {0,0,0.0,0};
589
590        /* We must free the old packet, otherwise select() will instantly
591         * return */
592        ring_release_frame(libtrace, packet);
593
594        /* Fetch the current frame */
595        header = GET_CURRENT_BUFFER(FORMAT_DATA_FIRST);
596        if (header->tp_status & TP_STATUS_USER &&
597            header->tp_status != TP_STATUS_LIBTRACE) {
598                /* We have a frame waiting */
599                event.size = trace_read_packet(libtrace, packet);
600                event.type = TRACE_EVENT_PACKET;
601        } else {
602                /* Ok we don't have a packet waiting */
603                event.type = TRACE_EVENT_IOWAIT;
604                event.fd = FORMAT_DATA_FIRST->fd;
605        }
606
607        return event;
608}
609
610/**
611 * Free any resources being kept for this packet, Note: libtrace
612 * will ensure all fields are zeroed correctly.
613 */
614static void linuxring_fin_packet(libtrace_packet_t *packet)
615{
616        libtrace_t *libtrace = packet->trace;
617
618        if (packet->buffer == NULL)
619                return;
620        assert(packet->trace);
621
622        /* If we own the packet (i.e. it's not a copy), we need to free it */
623        if (packet->buf_control == TRACE_CTRL_EXTERNAL) {
624                /* Started should always match the existence of the rx_ring
625                 * in the parallel case still just check the first ring */
626                assert(!!FORMAT_DATA_FIRST->rx_ring ==
627                       !!packet->trace->started);
628                /* If we don't have a ring its already been destroyed */
629                if (FORMAT_DATA_FIRST->rx_ring != MAP_FAILED)
630                        ring_release_frame(packet->trace, packet);
631                else
632                        packet->buffer = NULL;
633        }
634}
635
636static int linuxring_write_packet(libtrace_out_t *libtrace,
637                                  libtrace_packet_t *packet)
638{
639        struct tpacket2_hdr *header;
640        struct pollfd pollset;
641        struct socket_addr;
642        int ret;
643        unsigned max_size;
644        void * off;
645
646        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
647                return 0;
648
649        max_size = FORMAT_DATA_OUT->req.tp_frame_size -
650                TPACKET2_HDRLEN + sizeof(struct sockaddr_ll);
651
652        header = (void *)FORMAT_DATA_OUT->tx_ring +
653                (FORMAT_DATA_OUT->txring_offset *
654                 FORMAT_DATA_OUT->req.tp_frame_size);
655
656        while(header->tp_status != TP_STATUS_AVAILABLE) {
657                /* if none available: wait on more data */
658                pollset.fd = FORMAT_DATA_OUT->fd;
659                pollset.events = POLLOUT;
660                pollset.revents = 0;
661                ret = poll(&pollset, 1, 1000);
662                if (ret < 0 && errno != EINTR) {
663                        perror("poll");
664                        return -1;
665                }
666                if(ret == 0) {
667                        /* Timeout something has gone wrong - maybe the queue is
668                         * to large so try issue another send command
669                         */
670                        ret = sendto(FORMAT_DATA_OUT->fd,
671                                     NULL,
672                                     0,
673                                     0,
674                                     (void *)&FORMAT_DATA_OUT->sock_hdr,
675                                     sizeof(FORMAT_DATA_OUT->sock_hdr));
676                        if (ret < 0) {
677                                trace_set_err_out(libtrace, errno,
678                                                  "sendto after timeout "
679                                                  "failed");
680                                return -1;
681                        }
682                }
683        }
684
685        header->tp_len = trace_get_capture_length(packet);
686
687        /* We cannot write the whole packet so just write part of it */
688        if (header->tp_len > max_size)
689                header->tp_len = max_size;
690
691        /* Fill packet - no sockaddr_ll in header when writing to the TX_RING */
692        off = ((void *)header) + (TPACKET2_HDRLEN - sizeof(struct sockaddr_ll));
693        memcpy(off, (char *)packet->payload, header->tp_len);
694
695        /* 'Send it' and increase ring pointer to the next frame */
696        header->tp_status = TP_STATUS_SEND_REQUEST;
697        FORMAT_DATA_OUT->txring_offset = (FORMAT_DATA_OUT->txring_offset + 1) %
698                FORMAT_DATA_OUT->req.tp_frame_nr;
699
700        /* Notify kernel there are frames to send */
701        FORMAT_DATA_OUT->queue ++;
702        FORMAT_DATA_OUT->queue %= TX_MAX_QUEUE;
703        if(FORMAT_DATA_OUT->queue == 0){
704                ret = sendto(FORMAT_DATA_OUT->fd,
705                                NULL,
706                                0,
707                                MSG_DONTWAIT,
708                                (void *)&FORMAT_DATA_OUT->sock_hdr,
709                                sizeof(FORMAT_DATA_OUT->sock_hdr));
710                if (ret < 0) {
711                        trace_set_err_out(libtrace, errno, "sendto failed");
712                        return -1;
713                }
714        }
715        return header->tp_len;
716
717}
718
719static void linuxring_help(void)
720{
721        printf("linuxring format module: $Revision: 1793 $\n");
722        printf("Supported input URIs:\n");
723        printf("\tring:eth0\n");
724        printf("\n");
725        printf("Supported output URIs:\n");
726        printf("\tring:eth0\n");
727        printf("\n");
728        return;
729}
730
731static struct libtrace_format_t linuxring = {
732        "ring",
733        "$Id$",
734        TRACE_FORMAT_LINUX_RING,
735        linuxcommon_probe_filename,     /* probe filename */
736        NULL,                           /* probe magic */
737        linuxcommon_init_input,         /* init_input */
738        linuxcommon_config_input,       /* config_input */
739        linuxring_start_input,          /* start_input */
740        linuxcommon_pause_input,        /* pause_input */
741        linuxcommon_init_output,        /* init_output */
742        NULL,                           /* config_output */
743        linuxring_start_output,         /* start_ouput */
744        linuxcommon_fin_input,          /* fin_input */
745        linuxring_fin_output,           /* fin_output */
746        linuxring_read_packet,          /* read_packet */
747        linuxring_prepare_packet,       /* prepare_packet */
748        linuxring_fin_packet,           /* fin_packet */
749        linuxring_write_packet,         /* write_packet */
750        NULL,                           /* flush_output */
751        linuxring_get_link_type,        /* get_link_type */
752        linuxring_get_direction,        /* get_direction */
753        linuxring_set_direction,        /* set_direction */
754        NULL,                           /* get_erf_timestamp */
755        linuxring_get_timeval,          /* get_timeval */
756        linuxring_get_timespec,         /* get_timespec */
757        NULL,                           /* get_seconds */
758        NULL,                           /* seek_erf */
759        NULL,                           /* seek_timeval */
760        NULL,                           /* seek_seconds */
761        linuxring_get_capture_length,   /* get_capture_length */
762        linuxring_get_wire_length,      /* get_wire_length */
763        linuxring_get_framing_length,   /* get_framing_length */
764        linuxring_set_capture_length,   /* set_capture_length */
765        NULL,                           /* get_received_packets */
766        NULL,                           /* get_filtered_packets */
767        NULL,                           /* get_dropped_packets */
768        linuxcommon_get_statistics,     /* get_statistics */
769        linuxcommon_get_fd,             /* get_fd */
770        linuxring_event,                /* trace_event */
771        linuxring_help,                 /* help */
772        NULL,                           /* next pointer */
773#ifdef HAVE_PACKET_FANOUT
774        {true, -1},                     /* Live, no thread limit */
775        linuxring_pstart_input,         /* pstart_input */
776        linuxring_pread_packets,        /* pread_packets */
777        linuxcommon_pause_input,        /* ppause */
778        linuxcommon_fin_input,          /* p_fin */
779        linuxcommon_pregister_thread,   /* register thread */
780        NULL,                           /* unregister thread */
781        NULL                            /* get thread stats */
782#else
783        NON_PARALLEL(true)
784#endif
785};
786#else /* HAVE_NETPACKET_PACKET_H */
787
788static void linuxring_help(void)
789{
790        printf("linuxring format module: $Revision: 1793 $\n");
791        printf("Not supported on this host\n");
792}
793
794static struct libtrace_format_t linuxring = {
795        "ring",
796        "$Id$",
797        TRACE_FORMAT_LINUX_RING,
798        NULL,                           /* probe filename */
799        NULL,                           /* probe magic */
800        NULL,                           /* init_input */
801        NULL,                           /* config_input */
802        NULL,                           /* start_input */
803        NULL,                           /* pause_input */
804        NULL,                           /* init_output */
805        NULL,                           /* config_output */
806        NULL,                           /* start_ouput */
807        NULL,                           /* fin_input */
808        NULL,                           /* fin_output */
809        NULL,                           /* read_packet */
810        linuxring_prepare_packet,       /* prepare_packet */
811        NULL,                           /* fin_packet */
812        NULL,                           /* write_packet */
813        NULL,                           /* flush_output */
814        linuxring_get_link_type,        /* get_link_type */
815        linuxring_get_direction,        /* get_direction */
816        linuxring_set_direction,        /* set_direction */
817        NULL,                           /* get_erf_timestamp */
818        linuxring_get_timeval,          /* get_timeval */
819        linuxring_get_timespec,         /* get_timespec */
820        NULL,                           /* get_seconds */
821        NULL,                           /* seek_erf */
822        NULL,                           /* seek_timeval */
823        NULL,                           /* seek_seconds */
824        linuxring_get_capture_length,   /* get_capture_length */
825        linuxring_get_wire_length,      /* get_wire_length */
826        linuxring_get_framing_length,   /* get_framing_length */
827        linuxring_set_capture_length,   /* set_capture_length */
828        NULL,                           /* get_received_packets */
829        NULL,                           /* get_filtered_packets */
830        NULL,                           /* get_dropped_packets */
831        linuxcommon_get_statistics,     /* get_statistics */
832        NULL,                           /* get_fd */
833        NULL,                           /* trace_event */
834        linuxring_help,                 /* help */
835        NULL,                           /* next pointer */
836        NON_PARALLEL(true)
837};
838#endif /* HAVE_NETPACKET_PACKET_H */
839
840/* TODO: Figure out how to give this format preference over the linux native
841 * formate if the user only specifies an interface */
842void linuxring_constructor(void)
843{
844        pthread_mutex_init(&pagesize_mutex, NULL);
845        register_format(&linuxring);
846}
Note: See TracBrowser for help on using the repository browser.