source: lib/format_linux_ring.c @ db84bb2

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivendag_formatrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since db84bb2 was db84bb2, checked in by Shane Alcock <salcock@…>, 4 years ago

Ensure packet->order is always strictly incrementing

We cannot equate timestamp with packet->order, as some timestamp
methods are not strictly monotonic (ring: and int:).

Each format is now responsible for determining packet->order
during pread, so that the format can detect and correct such
inaccuracies.

More specifically, ring: and int: will cache the last reported
timestamp per thread and if time goes backwards, the order will
be set to last+1, otherwise the timestamp will be used.

DAG and DPDK still use the timestamp for ordering, since there
have been no issues with the timestamp ordering for these formats
(thus far!).

  • Property mode set to 100644
File size: 24.3 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27/* This format module deals with using the Linux Ring capture format (also
28 * known as PACKET_MMAP).
29 *
30 * Linux Ring is a LIVE capture format.
31 *
32 * This format also supports writing which will write packets out to the
33 * network as a form of packet replay. This should not be confused with the
34 * RT protocol which is intended to transfer captured packet records between
35 * RT-speaking programs.
36 */
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include <stdlib.h>
44#include <errno.h>
45#include <unistd.h>
46#include <string.h>
47#include <assert.h>
48
49#ifdef HAVE_INTTYPES_H
50#  include <inttypes.h>
51#else
52# error "Can't find inttypes.h"
53#endif
54
55#include "format_linux_common.h"
56
57/* Get the start of the captured data. I'm not sure if tp_mac (link layer) is
58 * always guaranteed. If it's not there then just use tp_net.
59 */
60#define TP_TRACE_START(mac, net, hdrend) \
61        ((mac) > (hdrend) && (mac) < (net) ? (mac) : (net))
62
63#ifdef HAVE_NETPACKET_PACKET_H
64/* Get current frame in the ring buffer*/
65#define GET_CURRENT_BUFFER(stream) \
66        ((void *)stream->rx_ring +                              \
67         (stream->rxring_offset *                               \
68          stream->req.tp_frame_size))
69
70/* Cached page size, the page size shouldn't be changing */
71static int pagesize = 0;
72
73/*
74 * Try figure out the best sizes for the ring buffer. Ensure that:
75 * - max(Block_size) == page_size << max_order
76 * - Frame_size == page_size << x (so that block_size%frame_size == 0)
77 *   This means that there will be no wasted space between blocks
78 * - Frame_size < block_size
79 * - Frame_size is as close as possible to LIBTRACE_PACKET_BUFSIZE, but not
80 *   bigger
81 * - Frame_nr = Block_nr * (frames per block)
82 * - CONF_RING_FRAMES is used a minimum number of frames to hold
83 * - Calculates based on max_order and buf_min
84 */
85static void calculate_buffers(struct tpacket_req * req, int fd, char * uri,
86                uint32_t max_order)
87{
88        struct ifreq ifr;
89        unsigned max_frame = LIBTRACE_PACKET_BUFSIZE;
90        pagesize = getpagesize();
91
92        strcpy(ifr.ifr_name, uri);
93        /* Don't bother trying to set frame size above mtu linux will drop
94         * these anyway.
95         *
96         * Remember, that our frame also has to include a TPACKET header!
97         */
98        if (ioctl(fd, SIOCGIFMTU, (caddr_t)&ifr) >= 0)
99                max_frame = ifr.ifr_mtu + TPACKET_ALIGN(TPACKET2_HDRLEN);
100        if (max_frame > LIBTRACE_PACKET_BUFSIZE)
101                max_frame = LIBTRACE_PACKET_BUFSIZE;
102
103        /* Calculate frame size */
104        req->tp_frame_size = pagesize;
105        while (req->tp_frame_size < max_frame &&
106              req->tp_frame_size < LIBTRACE_PACKET_BUFSIZE) {
107                req->tp_frame_size <<= 1;
108        }
109        if (req->tp_frame_size > LIBTRACE_PACKET_BUFSIZE)
110                req->tp_frame_size >>= 1;
111
112        /* Calculate block size */
113        req->tp_block_size = pagesize << max_order;
114        /* If max order is too high this might become 0 */
115        if (req->tp_block_size == 0) {
116                calculate_buffers(req, fd, uri, max_order-1);
117                return;
118        }
119        do {
120                req->tp_block_size >>= 1;
121        } while ((CONF_RING_FRAMES * req->tp_frame_size) <= req->tp_block_size);
122        req->tp_block_size <<= 1;
123
124        /* Calculate number of blocks */
125        req->tp_block_nr = (CONF_RING_FRAMES * req->tp_frame_size)
126                / req->tp_block_size;
127        if((CONF_RING_FRAMES * req->tp_frame_size) % req->tp_block_size != 0)
128                req->tp_block_nr++;
129
130        /* Calculate packets such that we use all the space we have to
131         * allocated */
132        req->tp_frame_nr = req->tp_block_nr *
133                (req->tp_block_size / req->tp_frame_size);
134
135        /*
136        printf("MaxO 0x%x BS 0x%x BN 0x%x FS 0x%x FN 0x%x\n",
137                max_order,
138                req->tp_block_size,
139                req->tp_block_nr,
140                req->tp_frame_size,
141                req->tp_frame_nr);
142        */
143
144        /* In case we have some silly values*/
145        assert(req->tp_block_size);
146        assert(req->tp_block_nr);
147        assert(req->tp_frame_size);
148        assert(req->tp_frame_nr);
149        assert(req->tp_block_size % req->tp_frame_size == 0);
150}
151
152static inline int socket_to_packetmmap(char * uridata, int ring_type,
153                                        int fd,
154                                        struct tpacket_req * req,
155                                        char ** ring_location,
156                                        uint32_t *max_order,
157                                        char *error) {
158        int val;
159
160        /* Switch to TPACKET header version 2, we only try support v2 because
161         * v1 had problems with data type consistancy */
162        val = TPACKET_V2;
163        if (setsockopt(fd,
164                       SOL_PACKET,
165                       PACKET_VERSION,
166                       &val,
167                       sizeof(val)) == -1) {
168                strncpy(error, "TPACKET2 not supported", 2048);
169                return -1;
170        }
171
172        /* Try switch to a ring buffer. If it fails we assume the the kernel
173         * cannot allocate a block of that size, so decrease max_block and
174         * retry.
175         */
176        while(1) {
177                if (*max_order <= 0) {
178                        strncpy(error,
179                                "Cannot allocate enough memory for ring buffer",
180                                2048);
181                        return -1;
182                }
183                calculate_buffers(req, fd, uridata, *max_order);
184                if (setsockopt(fd,
185                               SOL_PACKET,
186                               ring_type,
187                               req,
188                               sizeof(struct tpacket_req)) == -1) {
189                        if(errno == ENOMEM) {
190                                (*max_order)--;
191                        } else {
192                                strncpy(error,
193                                        "Error setting the ring buffer size",
194                                        2048);
195                                return -1;
196                        }
197
198                } else break;
199        }
200
201        /* Map the ring buffer into userspace */
202        *ring_location = mmap(NULL,
203                              req->tp_block_size * req->tp_block_nr,
204                              PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
205        if(*ring_location == MAP_FAILED) {
206                strncpy(error, "Failed to map memory for ring buffer", 2048);
207                return -1;
208        }
209
210        return 0;
211}
212
213/* Release a frame back to the kernel or free() if it's a malloc'd buffer
214 */
215inline static void ring_release_frame(libtrace_t *libtrace UNUSED,
216                                      libtrace_packet_t *packet)
217{
218        /* Free the old packet */
219        if(packet->buffer == NULL)
220                return;
221
222        if(packet->buf_control == TRACE_CTRL_PACKET){
223                free(packet->buffer);
224                packet->buffer = NULL;
225        }
226
227        if(packet->buf_control == TRACE_CTRL_EXTERNAL) {
228                //struct linux_format_data_t *ftd = FORMAT_DATA;
229                /* Check it's within our buffer first - consider the pause
230                 * resume case it might have already been free'd lets hope we
231                 * get another buffer */
232                // TODO: For now let any one free anything
233                /*if(LIBTRACE_BETWEEN((char *) packet->buffer,
234                                (char *) ftd->rx_ring,
235                                ftd->rx_ring +
236                                ftd->req.tp_block_size *
237                                ftd->req.tp_block_nr)){*/
238                TO_TP_HDR2(packet->buffer)->tp_status = 0;
239                packet->buffer = NULL;
240                /*}*/
241        }
242}
243
244static inline int linuxring_start_input_stream(libtrace_t *libtrace,
245                                               struct linux_per_stream_t *stream) {
246        char error[2048];
247
248        /* We set the socket up the same and then convert it to PACKET_MMAP */
249        if (linuxcommon_start_input_stream(libtrace, stream) < 0)
250                return -1;
251
252        strncpy(error, "No known error", 2048);
253
254        /* Make it a packetmmap */
255        if(socket_to_packetmmap(libtrace->uridata, PACKET_RX_RING,
256                                stream->fd,
257                                &stream->req,
258                                &stream->rx_ring,
259                                &FORMAT_DATA->max_order,
260                                error) != 0) {
261                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
262                              "Initialisation of packet MMAP failed: %s",
263                              error);
264                linuxcommon_close_input_stream(libtrace, stream);
265                return -1;
266        }
267
268        return 0;
269}
270
271static int linuxring_start_input(libtrace_t *libtrace)
272{
273        int ret = linuxring_start_input_stream(libtrace, FORMAT_DATA_FIRST);
274        return ret;
275}
276
277#ifdef HAVE_PACKET_FANOUT
278static int linuxring_pstart_input(libtrace_t *libtrace) {
279        return linuxcommon_pstart_input(libtrace, linuxring_start_input_stream);
280}
281#endif
282
283static int linuxring_start_output(libtrace_out_t *libtrace)
284{
285        char error[2048];
286        FORMAT_DATA_OUT->fd = socket(PF_PACKET, SOCK_RAW, 0);
287        if (FORMAT_DATA_OUT->fd==-1) {
288                free(FORMAT_DATA_OUT);
289                trace_set_err_out(libtrace, errno, "Failed to create raw socket");
290                return -1;
291        }
292
293        /* Make it a packetmmap */
294        if(socket_to_packetmmap(libtrace->uridata, PACKET_TX_RING,
295                                FORMAT_DATA_OUT->fd,
296                                &FORMAT_DATA_OUT->req,
297                                &FORMAT_DATA_OUT->tx_ring,
298                                &FORMAT_DATA_OUT->max_order,
299                                error) != 0) {
300                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED,
301                                  "Initialisation of packet MMAP failed: %s",
302                                  error);
303                close(FORMAT_DATA_OUT->fd);
304                free(FORMAT_DATA_OUT);
305                libtrace->format_data = NULL;
306                return -1;
307        }
308
309        FORMAT_DATA_OUT->sock_hdr.sll_family = AF_PACKET;
310        FORMAT_DATA_OUT->sock_hdr.sll_protocol = 0;
311        FORMAT_DATA_OUT->sock_hdr.sll_ifindex =
312                if_nametoindex(libtrace->uridata);
313        FORMAT_DATA_OUT->sock_hdr.sll_hatype = 0;
314        FORMAT_DATA_OUT->sock_hdr.sll_pkttype = 0;
315        FORMAT_DATA_OUT->sock_hdr.sll_halen = 0;
316        FORMAT_DATA_OUT->queue = 0;
317
318        return 0;
319}
320
321static int linuxring_fin_output(libtrace_out_t *libtrace)
322{
323        /* Make sure any remaining frames get sent */
324        sendto(FORMAT_DATA_OUT->fd,
325               NULL,
326               0,
327               0,
328               (void *) &FORMAT_DATA_OUT->sock_hdr,
329               sizeof(FORMAT_DATA_OUT->sock_hdr));
330
331        /* Unmap our data area */
332        munmap(FORMAT_DATA_OUT->tx_ring,
333               FORMAT_DATA_OUT->req.tp_block_size *
334               FORMAT_DATA_OUT->req.tp_block_nr);
335
336        /* Free the socket */
337        close(FORMAT_DATA_OUT->fd);
338        FORMAT_DATA_OUT->fd=-1;
339        free(libtrace->format_data);
340        return 0;
341}
342#endif /* HAVE_NETPACKET_PACKET_H */
343
344static libtrace_linktype_t
345linuxring_get_link_type(const struct libtrace_packet_t *packet)
346{
347        uint16_t linktype = GET_SOCKADDR_HDR(packet->buffer)->sll_hatype;
348        return linuxcommon_get_link_type(linktype);
349}
350
351static libtrace_direction_t
352linuxring_get_direction(const struct libtrace_packet_t *packet) {
353        return linuxcommon_get_direction(GET_SOCKADDR_HDR(packet->buffer)->
354                                         sll_pkttype);
355}
356
357static libtrace_direction_t
358linuxring_set_direction(libtrace_packet_t *packet,
359                        libtrace_direction_t direction) {
360        return linuxcommon_set_direction(GET_SOCKADDR_HDR(packet->buffer), direction);
361}
362
363static struct timeval linuxring_get_timeval(const libtrace_packet_t *packet)
364{
365        struct timeval tv;
366        tv.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
367        tv.tv_usec = TO_TP_HDR2(packet->buffer)->tp_nsec / 1000;
368        return tv;
369}
370
371static struct timespec linuxring_get_timespec(const libtrace_packet_t *packet)
372{
373        struct timespec ts;
374        ts.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
375        ts.tv_nsec = TO_TP_HDR2(packet->buffer)->tp_nsec;
376        return ts;
377}
378
379static int linuxring_get_capture_length(const libtrace_packet_t *packet)
380{
381        return TO_TP_HDR2(packet->buffer)->tp_snaplen;
382}
383
384static int linuxring_get_wire_length(const libtrace_packet_t *packet)
385{
386        int wirelen = TO_TP_HDR2(packet->buffer)->tp_len;
387
388        /* Include the missing FCS */
389        if (trace_get_link_type(packet) == TRACE_TYPE_ETH)
390                wirelen += 4;
391
392        return wirelen;
393}
394
395static int linuxring_get_framing_length(const libtrace_packet_t *packet)
396{
397        /*
398         * Need to make frame_length + capture_length = complete capture length
399         * so include alignment whitespace. So reverse calculate from packet.
400         */
401        return (char *)packet->payload - (char *)packet->buffer;
402}
403
404static size_t linuxring_set_capture_length(libtrace_packet_t *packet,
405                                           size_t size)
406{
407        assert(packet);
408        if (size > trace_get_capture_length(packet)) {
409                /* We should avoid making a packet larger */
410                return trace_get_capture_length(packet);
411        }
412
413        /* Reset the cached capture length */
414        packet->capture_length = -1;
415
416        TO_TP_HDR2(packet->buffer)->tp_snaplen = size;
417
418        return trace_get_capture_length(packet);
419}
420
421static int linuxring_prepare_packet(libtrace_t *libtrace UNUSED,
422                                    libtrace_packet_t *packet, void *buffer,
423                                    libtrace_rt_types_t rt_type, uint32_t flags)
424{
425        if (packet->buffer != buffer &&
426            packet->buf_control == TRACE_CTRL_PACKET) {
427                free(packet->buffer);
428        }
429
430        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
431                packet->buf_control = TRACE_CTRL_PACKET;
432        else
433                packet->buf_control = TRACE_CTRL_EXTERNAL;
434
435
436        packet->buffer = buffer;
437        packet->header = buffer;
438        packet->payload = (char *)buffer +
439                TP_TRACE_START(TO_TP_HDR2(packet->header)->tp_mac,
440                               TO_TP_HDR2(packet->header)->tp_net,
441                               TPACKET2_HDRLEN);
442        packet->type = rt_type;
443
444        return 0;
445}
446
447#ifdef HAVE_NETPACKET_PACKET_H
448#define LIBTRACE_MIN(a,b) ((a)<(b) ? (a) : (b))
449/* We use TP_STATUS_LIBTRACE to ensure we don't loop back on ourself
450 * and read the same packet twice if an old packet has not yet been freed */
451#define TP_STATUS_LIBTRACE 0xFFFFFFFF
452
453inline static int linuxring_read_stream(libtrace_t *libtrace,
454                                        libtrace_packet_t *packet,
455                                        struct linux_per_stream_t *stream,
456                                        libtrace_message_queue_t *queue) {
457
458        struct tpacket2_hdr *header;
459        int ret;
460        unsigned int snaplen;
461        struct pollfd pollset[2];
462
463        ring_release_frame(libtrace, packet);
464       
465        packet->buf_control = TRACE_CTRL_EXTERNAL;
466        packet->type = TRACE_RT_DATA_LINUX_RING;
467       
468        /* Fetch the current frame */
469        header = GET_CURRENT_BUFFER(stream);
470        assert((((unsigned long) header) & (pagesize - 1)) == 0);
471
472        /* TP_STATUS_USER means that we can use the frame.
473         * When a slot does not have this flag set, the frame is not
474         * ready for consumption.
475         */
476        while (!(header->tp_status & TP_STATUS_USER) ||
477               header->tp_status == TP_STATUS_LIBTRACE) {
478                if ((ret=is_halted(libtrace)) != -1)
479                        return ret;
480                pollset[0].fd = stream->fd;
481                pollset[0].events = POLLIN;
482                pollset[0].revents = 0;
483                if (queue) {
484                        pollset[1].fd = libtrace_message_queue_get_fd(queue);
485                        pollset[1].events = POLLIN;
486                        pollset[1].revents = 0;
487                }
488                /* Wait for more data or a message */
489                ret = poll(pollset, (queue ? 2 : 1), 500);
490                if (ret > 0) {
491                        if (pollset[0].revents == POLLIN)
492                                continue;
493                        else if (queue && pollset[1].revents == POLLIN)
494                                return READ_MESSAGE;
495                        else if (queue && pollset[1].revents) {
496                                /* Internal error */
497                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
498                                              "Message queue error %d poll()",
499                                              pollset[1].revents);
500                                return READ_ERROR;
501                        } else {
502                                /* Try get the error from the socket */
503                                int err = ENETDOWN;
504                                socklen_t len = sizeof(err);
505                                getsockopt(stream->fd, SOL_SOCKET, SO_ERROR,
506                                           &err, &len);
507                                trace_set_err(libtrace, err,
508                                              "Socket error revents=%d poll()",
509                                              pollset[0].revents);
510                                return READ_ERROR;
511                        }
512                } else if (ret < 0) {
513                        if (errno != EINTR) {
514                                trace_set_err(libtrace,errno,"poll()");
515                                return -1;
516                        }
517                } else {
518                        /* Poll timed out - check if we should exit on next loop */
519                        continue;
520                }
521        }
522        packet->buffer = header;
523        packet->trace = libtrace;
524       
525        header->tp_status = TP_STATUS_LIBTRACE;
526
527        /* If a snaplen was configured, automatically truncate the packet to
528         * the desired length.
529         */
530        snaplen=LIBTRACE_MIN(
531                        (int)LIBTRACE_PACKET_BUFSIZE-(int)sizeof(*header),
532                        (int)FORMAT_DATA->snaplen);
533       
534        TO_TP_HDR2(packet->buffer)->tp_snaplen = LIBTRACE_MIN((unsigned int)snaplen, TO_TP_HDR2(packet->buffer)->tp_len);
535
536        /* Move to next buffer */
537        stream->rxring_offset++;
538        stream->rxring_offset %= stream->req.tp_frame_nr;
539
540        packet->order = (((uint64_t)TO_TP_HDR2(packet->buffer)->tp_sec) << 32)
541                        + ((((uint64_t)TO_TP_HDR2(packet->buffer)->tp_nsec)
542                        << 32) / 1000000000);
543
544        if (packet->order <= stream->last_timestamp) {
545                packet->order = stream->last_timestamp + 1;
546        }
547
548        stream->last_timestamp = packet->order;
549               
550
551        /* We just need to get prepare_packet to set all our packet pointers
552         * appropriately */
553        if (linuxring_prepare_packet(libtrace, packet, packet->buffer,
554                                packet->type, 0))
555                return -1;
556        return  linuxring_get_framing_length(packet) + 
557                                linuxring_get_capture_length(packet);
558
559}
560
561static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
562        return linuxring_read_stream(libtrace, packet, FORMAT_DATA_FIRST, NULL);
563}
564
565#ifdef HAVE_PACKET_FANOUT
566static int linuxring_pread_packets(libtrace_t *libtrace,
567                                   libtrace_thread_t *t,
568                                   libtrace_packet_t *packets[],
569                                   UNUSED size_t nb_packets) {
570        /* For now just read one packet */
571        packets[0]->error = linuxring_read_stream(libtrace, packets[0],
572                                                  t->format_data, &t->messages);
573        if (packets[0]->error >= 1)
574                return 1;
575        else
576                return packets[0]->error;
577}
578#endif
579
580/* Non-blocking read */
581static libtrace_eventobj_t linuxring_event(libtrace_t *libtrace,
582                                           libtrace_packet_t *packet)
583{
584        struct tpacket2_hdr *header;
585        libtrace_eventobj_t event = {0,0,0.0,0};
586
587        /* We must free the old packet, otherwise select() will instantly
588         * return */
589        ring_release_frame(libtrace, packet);
590
591        /* Fetch the current frame */
592        header = GET_CURRENT_BUFFER(FORMAT_DATA_FIRST);
593        if (header->tp_status & TP_STATUS_USER &&
594            header->tp_status != TP_STATUS_LIBTRACE) {
595                /* We have a frame waiting */
596                event.size = trace_read_packet(libtrace, packet);
597                event.type = TRACE_EVENT_PACKET;
598        } else {
599                /* Ok we don't have a packet waiting */
600                event.type = TRACE_EVENT_IOWAIT;
601                event.fd = FORMAT_DATA_FIRST->fd;
602        }
603
604        return event;
605}
606
607/**
608 * Free any resources being kept for this packet, Note: libtrace
609 * will ensure all fields are zeroed correctly.
610 */
611static void linuxring_fin_packet(libtrace_packet_t *packet)
612{
613        libtrace_t *libtrace = packet->trace;
614
615        if (packet->buffer == NULL)
616                return;
617        assert(packet->trace);
618
619        /* If we own the packet (i.e. it's not a copy), we need to free it */
620        if (packet->buf_control == TRACE_CTRL_EXTERNAL) {
621                /* Started should always match the existence of the rx_ring
622                 * in the parallel case still just check the first ring */
623                assert(!!FORMAT_DATA_FIRST->rx_ring ==
624                       !!packet->trace->started);
625                /* If we don't have a ring its already been destroyed */
626                if (FORMAT_DATA_FIRST->rx_ring != MAP_FAILED)
627                        ring_release_frame(packet->trace, packet);
628                else
629                        packet->buffer = NULL;
630        }
631}
632
633static int linuxring_write_packet(libtrace_out_t *libtrace,
634                                  libtrace_packet_t *packet)
635{
636        struct tpacket2_hdr *header;
637        struct pollfd pollset;
638        struct socket_addr;
639        int ret;
640        unsigned max_size;
641        void * off;
642
643        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
644                return 0;
645
646        max_size = FORMAT_DATA_OUT->req.tp_frame_size -
647                TPACKET2_HDRLEN + sizeof(struct sockaddr_ll);
648
649        header = (void *)FORMAT_DATA_OUT->tx_ring +
650                (FORMAT_DATA_OUT->txring_offset *
651                 FORMAT_DATA_OUT->req.tp_frame_size);
652
653        while(header->tp_status != TP_STATUS_AVAILABLE) {
654                /* if none available: wait on more data */
655                pollset.fd = FORMAT_DATA_OUT->fd;
656                pollset.events = POLLOUT;
657                pollset.revents = 0;
658                ret = poll(&pollset, 1, 1000);
659                if (ret < 0 && errno != EINTR) {
660                        perror("poll");
661                        return -1;
662                }
663                if(ret == 0) {
664                        /* Timeout something has gone wrong - maybe the queue is
665                         * to large so try issue another send command
666                         */
667                        ret = sendto(FORMAT_DATA_OUT->fd,
668                                     NULL,
669                                     0,
670                                     0,
671                                     (void *)&FORMAT_DATA_OUT->sock_hdr,
672                                     sizeof(FORMAT_DATA_OUT->sock_hdr));
673                        if (ret < 0) {
674                                trace_set_err_out(libtrace, errno,
675                                                  "sendto after timeout "
676                                                  "failed");
677                                return -1;
678                        }
679                }
680        }
681
682        header->tp_len = trace_get_capture_length(packet);
683
684        /* We cannot write the whole packet so just write part of it */
685        if (header->tp_len > max_size)
686                header->tp_len = max_size;
687
688        /* Fill packet - no sockaddr_ll in header when writing to the TX_RING */
689        off = ((void *)header) + (TPACKET2_HDRLEN - sizeof(struct sockaddr_ll));
690        memcpy(off, (char *)packet->payload, header->tp_len);
691
692        /* 'Send it' and increase ring pointer to the next frame */
693        header->tp_status = TP_STATUS_SEND_REQUEST;
694        FORMAT_DATA_OUT->txring_offset = (FORMAT_DATA_OUT->txring_offset + 1) %
695                FORMAT_DATA_OUT->req.tp_frame_nr;
696
697        /* Notify kernel there are frames to send */
698        FORMAT_DATA_OUT->queue ++;
699        FORMAT_DATA_OUT->queue %= TX_MAX_QUEUE;
700        if(FORMAT_DATA_OUT->queue == 0){
701                ret = sendto(FORMAT_DATA_OUT->fd,
702                                NULL,
703                                0,
704                                MSG_DONTWAIT,
705                                (void *)&FORMAT_DATA_OUT->sock_hdr,
706                                sizeof(FORMAT_DATA_OUT->sock_hdr));
707                if (ret < 0) {
708                        trace_set_err_out(libtrace, errno, "sendto failed");
709                        return -1;
710                }
711        }
712        return header->tp_len;
713
714}
715
716static void linuxring_help(void)
717{
718        printf("linuxring format module: $Revision: 1793 $\n");
719        printf("Supported input URIs:\n");
720        printf("\tring:eth0\n");
721        printf("\n");
722        printf("Supported output URIs:\n");
723        printf("\tring:eth0\n");
724        printf("\n");
725        return;
726}
727
728static struct libtrace_format_t linuxring = {
729        "ring",
730        "$Id$",
731        TRACE_FORMAT_LINUX_RING,
732        linuxcommon_probe_filename,     /* probe filename */
733        NULL,                           /* probe magic */
734        linuxcommon_init_input,         /* init_input */
735        linuxcommon_config_input,       /* config_input */
736        linuxring_start_input,          /* start_input */
737        linuxcommon_pause_input,        /* pause_input */
738        linuxcommon_init_output,        /* init_output */
739        NULL,                           /* config_output */
740        linuxring_start_output,         /* start_ouput */
741        linuxcommon_fin_input,          /* fin_input */
742        linuxring_fin_output,           /* fin_output */
743        linuxring_read_packet,          /* read_packet */
744        linuxring_prepare_packet,       /* prepare_packet */
745        linuxring_fin_packet,           /* fin_packet */
746        linuxring_write_packet,         /* write_packet */
747        linuxring_get_link_type,        /* get_link_type */
748        linuxring_get_direction,        /* get_direction */
749        linuxring_set_direction,        /* set_direction */
750        NULL,                           /* get_erf_timestamp */
751        linuxring_get_timeval,          /* get_timeval */
752        linuxring_get_timespec,         /* get_timespec */
753        NULL,                           /* get_seconds */
754        NULL,                           /* seek_erf */
755        NULL,                           /* seek_timeval */
756        NULL,                           /* seek_seconds */
757        linuxring_get_capture_length,   /* get_capture_length */
758        linuxring_get_wire_length,      /* get_wire_length */
759        linuxring_get_framing_length,   /* get_framing_length */
760        linuxring_set_capture_length,   /* set_capture_length */
761        NULL,                           /* get_received_packets */
762        NULL,                           /* get_filtered_packets */
763        NULL,                           /* get_dropped_packets */
764        linuxcommon_get_statistics,     /* get_statistics */
765        linuxcommon_get_fd,             /* get_fd */
766        linuxring_event,                /* trace_event */
767        linuxring_help,                 /* help */
768        NULL,                           /* next pointer */
769#ifdef HAVE_PACKET_FANOUT
770        {true, -1},                     /* Live, no thread limit */
771        linuxring_pstart_input,         /* pstart_input */
772        linuxring_pread_packets,        /* pread_packets */
773        linuxcommon_pause_input,        /* ppause */
774        linuxcommon_fin_input,          /* p_fin */
775        linuxcommon_pregister_thread,   /* register thread */
776        NULL,                           /* unregister thread */
777        NULL                            /* get thread stats */
778#else
779        NON_PARALLEL(true)
780#endif
781};
782#else /* HAVE_NETPACKET_PACKET_H */
783
784static void linuxring_help(void)
785{
786        printf("linuxring format module: $Revision: 1793 $\n");
787        printf("Not supported on this host\n");
788}
789
790static struct libtrace_format_t linuxring = {
791        "ring",
792        "$Id$",
793        TRACE_FORMAT_LINUX_RING,
794        NULL,                           /* probe filename */
795        NULL,                           /* probe magic */
796        NULL,                           /* init_input */
797        NULL,                           /* config_input */
798        NULL,                           /* start_input */
799        NULL,                           /* pause_input */
800        NULL,                           /* init_output */
801        NULL,                           /* config_output */
802        NULL,                           /* start_ouput */
803        NULL,                           /* fin_input */
804        NULL,                           /* fin_output */
805        NULL,                           /* read_packet */
806        linuxring_prepare_packet,       /* prepare_packet */
807        NULL,                           /* fin_packet */
808        NULL,                           /* write_packet */
809        linuxring_get_link_type,        /* get_link_type */
810        linuxring_get_direction,        /* get_direction */
811        linuxring_set_direction,        /* set_direction */
812        NULL,                           /* get_erf_timestamp */
813        linuxring_get_timeval,          /* get_timeval */
814        linuxring_get_timespec,         /* get_timespec */
815        NULL,                           /* get_seconds */
816        NULL,                           /* seek_erf */
817        NULL,                           /* seek_timeval */
818        NULL,                           /* seek_seconds */
819        linuxring_get_capture_length,   /* get_capture_length */
820        linuxring_get_wire_length,      /* get_wire_length */
821        linuxring_get_framing_length,   /* get_framing_length */
822        linuxring_set_capture_length,   /* set_capture_length */
823        NULL,                           /* get_received_packets */
824        NULL,                           /* get_filtered_packets */
825        NULL,                           /* get_dropped_packets */
826        linuxcommon_get_statistics,     /* get_statistics */
827        NULL,                           /* get_fd */
828        NULL,                           /* trace_event */
829        linuxring_help,                 /* help */
830        NULL,                           /* next pointer */
831        NON_PARALLEL(true)
832};
833#endif /* HAVE_NETPACKET_PACKET_H */
834
835/* TODO: Figure out how to give this format preference over the linux native
836 * formate if the user only specifies an interface */
837void linuxring_constructor(void)
838{
839        register_format(&linuxring);
840}
Note: See TracBrowser for help on using the repository browser.