source: lib/format_linux_ring.c @ ebed638

developringdecrementfixringperformance
Last change on this file since ebed638 was ebed638, checked in by Shane Alcock <salcock@…>, 2 years ago

Don't munmap ring rx buffers until last possible moment.

As soon as we munmap the buffers, any packets contained within
those buffers become invalid -- however, the user application may
still have references to those packets and could try to operate
on them without knowing that the memory holding the packet payload
is invalid.

Previously, the buffers were munmapped when the trace was paused,
which was causing crashes and invalid memory accesses.

Instead, we now only munmap the buffer if either the trace is
restarted or destroyed, so the packets will remain valid for
longer. Also, the new "start" iteration counting added in an
earlier commit means we can now recognise when a packet belongs
to an unmapped buffer and therefore return appropriate errors if
a user tries to interact with the packet.

This also fixes the assertion failure when freeing a ring packet
after the ring trace has been paused, i.e. after a call to
trace_pstop().

  • Property mode set to 100644
File size: 25.4 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27/* This format module deals with using the Linux Ring capture format (also
28 * known as PACKET_MMAP).
29 *
30 * Linux Ring is a LIVE capture format.
31 *
32 * This format also supports writing which will write packets out to the
33 * network as a form of packet replay. This should not be confused with the
34 * RT protocol which is intended to transfer captured packet records between
35 * RT-speaking programs.
36 */
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include <stdlib.h>
44#include <errno.h>
45#include <unistd.h>
46#include <string.h>
47#include <assert.h>
48
49#ifdef HAVE_INTTYPES_H
50#  include <inttypes.h>
51#else
52# error "Can't find inttypes.h"
53#endif
54
55#include "format_linux_common.h"
56
57/* Get the start of the captured data. I'm not sure if tp_mac (link layer) is
58 * always guaranteed. If it's not there then just use tp_net.
59 */
60#define TP_TRACE_START(mac, net, hdrend) \
61        ((mac) > (hdrend) && (mac) < (net) ? (mac) : (net))
62
63static pthread_mutex_t pagesize_mutex;
64#ifdef HAVE_NETPACKET_PACKET_H
65/* Get current frame in the ring buffer*/
66#define GET_CURRENT_BUFFER(stream) \
67        ((void *)stream->rx_ring +                              \
68         (stream->rxring_offset *                               \
69          stream->req.tp_frame_size))
70
71/* Cached page size, the page size shouldn't be changing */
72static int pagesize = 0;
73
74
75/*
76 * Try figure out the best sizes for the ring buffer. Ensure that:
77 * - max(Block_size) == page_size << max_order
78 * - Frame_size == page_size << x (so that block_size%frame_size == 0)
79 *   This means that there will be no wasted space between blocks
80 * - Frame_size < block_size
81 * - Frame_size is as close as possible to LIBTRACE_PACKET_BUFSIZE, but not
82 *   bigger
83 * - Frame_nr = Block_nr * (frames per block)
84 * - CONF_RING_FRAMES is used a minimum number of frames to hold
85 * - Calculates based on max_order and buf_min
86 */
87static void calculate_buffers(struct tpacket_req * req, int fd, char * uri,
88                uint32_t max_order)
89{
90        struct ifreq ifr;
91        unsigned max_frame = LIBTRACE_PACKET_BUFSIZE;
92        pthread_mutex_lock(&pagesize_mutex);
93        if (pagesize == 0) {
94                pagesize = getpagesize();
95        }
96        pthread_mutex_unlock(&pagesize_mutex);
97
98        strcpy(ifr.ifr_name, uri);
99        /* Don't bother trying to set frame size above mtu linux will drop
100         * these anyway.
101         *
102         * Remember, that our frame also has to include a TPACKET header!
103         */
104        if (ioctl(fd, SIOCGIFMTU, (caddr_t)&ifr) >= 0)
105                max_frame = ifr.ifr_mtu + TPACKET_ALIGN(TPACKET2_HDRLEN);
106        if (max_frame > LIBTRACE_PACKET_BUFSIZE)
107                max_frame = LIBTRACE_PACKET_BUFSIZE;
108
109        /* Calculate frame size */
110        req->tp_frame_size = pagesize;
111        while (req->tp_frame_size < max_frame &&
112              req->tp_frame_size < LIBTRACE_PACKET_BUFSIZE) {
113                req->tp_frame_size <<= 1;
114        }
115        if (req->tp_frame_size > LIBTRACE_PACKET_BUFSIZE)
116                req->tp_frame_size >>= 1;
117
118        /* Calculate block size */
119        req->tp_block_size = pagesize << max_order;
120        /* If max order is too high this might become 0 */
121        if (req->tp_block_size == 0) {
122                calculate_buffers(req, fd, uri, max_order-1);
123                return;
124        }
125        do {
126                req->tp_block_size >>= 1;
127        } while ((CONF_RING_FRAMES * req->tp_frame_size) <= req->tp_block_size);
128        req->tp_block_size <<= 1;
129
130        /* Calculate number of blocks */
131        req->tp_block_nr = (CONF_RING_FRAMES * req->tp_frame_size)
132                / req->tp_block_size;
133        if((CONF_RING_FRAMES * req->tp_frame_size) % req->tp_block_size != 0)
134                req->tp_block_nr++;
135
136        /* Calculate packets such that we use all the space we have to
137         * allocated */
138        req->tp_frame_nr = req->tp_block_nr *
139                (req->tp_block_size / req->tp_frame_size);
140
141        /*
142        printf("MaxO 0x%x BS 0x%x BN 0x%x FS 0x%x FN 0x%x\n",
143                max_order,
144                req->tp_block_size,
145                req->tp_block_nr,
146                req->tp_frame_size,
147                req->tp_frame_nr);
148        */
149
150        /* In case we have some silly values*/
151        assert(req->tp_block_size);
152        assert(req->tp_block_nr);
153        assert(req->tp_frame_size);
154        assert(req->tp_frame_nr);
155        assert(req->tp_block_size % req->tp_frame_size == 0);
156}
157
158static inline int socket_to_packetmmap(char * uridata, int ring_type,
159                                        int fd,
160                                        struct tpacket_req * req,
161                                        char ** ring_location,
162                                        uint32_t *max_order,
163                                        char *error) {
164        int val;
165
166        /* Switch to TPACKET header version 2, we only try support v2 because
167         * v1 had problems with data type consistancy */
168        val = TPACKET_V2;
169        if (setsockopt(fd,
170                       SOL_PACKET,
171                       PACKET_VERSION,
172                       &val,
173                       sizeof(val)) == -1) {
174                strncpy(error, "TPACKET2 not supported", 2048);
175                return -1;
176        }
177
178        /* Try switch to a ring buffer. If it fails we assume the the kernel
179         * cannot allocate a block of that size, so decrease max_block and
180         * retry.
181         */
182        while(1) {
183                if (*max_order <= 0) {
184                        strncpy(error,
185                                "Cannot allocate enough memory for ring buffer",
186                                2048);
187                        return -1;
188                }
189                calculate_buffers(req, fd, uridata, *max_order);
190                if (setsockopt(fd,
191                               SOL_PACKET,
192                               ring_type,
193                               req,
194                               sizeof(struct tpacket_req)) == -1) {
195                        if(errno == ENOMEM) {
196                                (*max_order)--;
197                        } else {
198                                strncpy(error,
199                                        "Error setting the ring buffer size",
200                                        2048);
201                                return -1;
202                        }
203
204                } else break;
205        }
206
207        /* Map the ring buffer into userspace */
208        *ring_location = mmap(NULL,
209                              req->tp_block_size * req->tp_block_nr,
210                              PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
211        if(*ring_location == MAP_FAILED) {
212                strncpy(error, "Failed to map memory for ring buffer", 2048);
213                return -1;
214        }
215
216        return 0;
217}
218
219/* Release a frame back to the kernel or free() if it's a malloc'd buffer
220 */
221inline static void ring_release_frame(libtrace_t *libtrace UNUSED,
222                                      libtrace_packet_t *packet)
223{
224        /* Free the old packet */
225        if(packet->buffer == NULL)
226                return;
227
228        if(packet->buf_control == TRACE_CTRL_PACKET){
229                free(packet->buffer);
230                packet->buffer = NULL;
231        }
232
233        if(packet->buf_control == TRACE_CTRL_EXTERNAL) {
234                //struct linux_format_data_t *ftd = FORMAT_DATA;
235                /* Check it's within our buffer first - consider the pause
236                 * resume case it might have already been free'd lets hope we
237                 * get another buffer */
238                // TODO: For now let any one free anything
239                /*if(LIBTRACE_BETWEEN((char *) packet->buffer,
240                                (char *) ftd->rx_ring,
241                                ftd->rx_ring +
242                                ftd->req.tp_block_size *
243                                ftd->req.tp_block_nr)){*/
244                TO_TP_HDR2(packet->buffer)->tp_status = 0;
245                packet->buffer = NULL;
246                /*}*/
247        }
248}
249
250static inline int linuxring_start_input_stream(libtrace_t *libtrace,
251                                               struct linux_per_stream_t *stream) {
252        char error[2048];
253
254        /* Unmap any previous ring buffers associated with this stream. */
255        if (stream->rx_ring != MAP_FAILED) {
256                munmap(stream->rx_ring, stream->req.tp_block_size *
257                                stream->req.tp_block_nr);
258                stream->rx_ring = MAP_FAILED;
259                stream->rxring_offset = 0;
260        }
261
262
263        /* We set the socket up the same and then convert it to PACKET_MMAP */
264        if (linuxcommon_start_input_stream(libtrace, stream) < 0)
265                return -1;
266
267        strncpy(error, "No known error", 2048);
268
269        /* Make it a packetmmap */
270        if(socket_to_packetmmap(libtrace->uridata, PACKET_RX_RING,
271                                stream->fd,
272                                &stream->req,
273                                &stream->rx_ring,
274                                &FORMAT_DATA->max_order,
275                                error) != 0) {
276                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
277                              "Initialisation of packet MMAP failed: %s",
278                              error);
279                linuxcommon_close_input_stream(libtrace, stream);
280                return -1;
281        }
282
283        return 0;
284}
285
286static int linuxring_fin_input(libtrace_t *libtrace) {
287        size_t i;
288
289        if (libtrace->format_data) {
290                for (i = 0; i < libtrace_list_get_size(FORMAT_DATA->per_stream); ++i) {
291                        struct linux_per_stream_t *stream;
292                        stream = libtrace_list_get_index(
293                                FORMAT_DATA->per_stream, i)->data;
294                        if (stream->rx_ring != MAP_FAILED) {
295                                munmap(stream->rx_ring,
296                                                stream->req.tp_block_size *
297                                                stream->req.tp_block_nr);
298                        }
299                }
300
301                if (FORMAT_DATA->filter != NULL)
302                        free(FORMAT_DATA->filter);
303
304                if (FORMAT_DATA->per_stream)
305                        libtrace_list_deinit(FORMAT_DATA->per_stream);
306
307                free(libtrace->format_data);
308        }
309
310        return 0;
311}
312
313
314static int linuxring_start_input(libtrace_t *libtrace)
315{
316        int ret = linuxring_start_input_stream(libtrace, FORMAT_DATA_FIRST);
317        return ret;
318}
319
320#ifdef HAVE_PACKET_FANOUT
321static int linuxring_pstart_input(libtrace_t *libtrace) {
322        return linuxcommon_pstart_input(libtrace, linuxring_start_input_stream);
323}
324#endif
325
326static int linuxring_start_output(libtrace_out_t *libtrace)
327{
328        char error[2048];
329        FORMAT_DATA_OUT->fd = socket(PF_PACKET, SOCK_RAW, 0);
330        if (FORMAT_DATA_OUT->fd==-1) {
331                free(FORMAT_DATA_OUT);
332                trace_set_err_out(libtrace, errno, "Failed to create raw socket");
333                return -1;
334        }
335
336        /* Make it a packetmmap */
337        if(socket_to_packetmmap(libtrace->uridata, PACKET_TX_RING,
338                                FORMAT_DATA_OUT->fd,
339                                &FORMAT_DATA_OUT->req,
340                                &FORMAT_DATA_OUT->tx_ring,
341                                &FORMAT_DATA_OUT->max_order,
342                                error) != 0) {
343                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED,
344                                  "Initialisation of packet MMAP failed: %s",
345                                  error);
346                close(FORMAT_DATA_OUT->fd);
347                free(FORMAT_DATA_OUT);
348                libtrace->format_data = NULL;
349                return -1;
350        }
351
352        FORMAT_DATA_OUT->sock_hdr.sll_family = AF_PACKET;
353        FORMAT_DATA_OUT->sock_hdr.sll_protocol = 0;
354        FORMAT_DATA_OUT->sock_hdr.sll_ifindex =
355                if_nametoindex(libtrace->uridata);
356        FORMAT_DATA_OUT->sock_hdr.sll_hatype = 0;
357        FORMAT_DATA_OUT->sock_hdr.sll_pkttype = 0;
358        FORMAT_DATA_OUT->sock_hdr.sll_halen = 0;
359        FORMAT_DATA_OUT->queue = 0;
360
361        return 0;
362}
363
364static int linuxring_fin_output(libtrace_out_t *libtrace)
365{
366        /* Make sure any remaining frames get sent */
367        sendto(FORMAT_DATA_OUT->fd,
368               NULL,
369               0,
370               0,
371               (void *) &FORMAT_DATA_OUT->sock_hdr,
372               sizeof(FORMAT_DATA_OUT->sock_hdr));
373
374        /* Unmap our data area */
375        munmap(FORMAT_DATA_OUT->tx_ring,
376               FORMAT_DATA_OUT->req.tp_block_size *
377               FORMAT_DATA_OUT->req.tp_block_nr);
378
379        /* Free the socket */
380        close(FORMAT_DATA_OUT->fd);
381        FORMAT_DATA_OUT->fd=-1;
382        free(libtrace->format_data);
383        return 0;
384}
385#endif /* HAVE_NETPACKET_PACKET_H */
386
387static libtrace_linktype_t
388linuxring_get_link_type(const struct libtrace_packet_t *packet)
389{
390        uint16_t linktype = GET_SOCKADDR_HDR(packet->buffer)->sll_hatype;
391        return linuxcommon_get_link_type(linktype);
392}
393
394static libtrace_direction_t
395linuxring_get_direction(const struct libtrace_packet_t *packet) {
396        return linuxcommon_get_direction(GET_SOCKADDR_HDR(packet->buffer)->
397                                         sll_pkttype);
398}
399
400static libtrace_direction_t
401linuxring_set_direction(libtrace_packet_t *packet,
402                        libtrace_direction_t direction) {
403        return linuxcommon_set_direction(GET_SOCKADDR_HDR(packet->buffer), direction);
404}
405
406static struct timeval linuxring_get_timeval(const libtrace_packet_t *packet)
407{
408        struct timeval tv;
409        tv.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
410        tv.tv_usec = TO_TP_HDR2(packet->buffer)->tp_nsec / 1000;
411        return tv;
412}
413
414static struct timespec linuxring_get_timespec(const libtrace_packet_t *packet)
415{
416        struct timespec ts;
417        ts.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
418        ts.tv_nsec = TO_TP_HDR2(packet->buffer)->tp_nsec;
419        return ts;
420}
421
422static int linuxring_get_capture_length(const libtrace_packet_t *packet)
423{
424        return TO_TP_HDR2(packet->buffer)->tp_snaplen;
425}
426
427static int linuxring_get_wire_length(const libtrace_packet_t *packet)
428{
429        int wirelen = TO_TP_HDR2(packet->buffer)->tp_len;
430
431        /* Include the missing FCS */
432        if (trace_get_link_type(packet) == TRACE_TYPE_ETH)
433                wirelen += 4;
434
435        return wirelen;
436}
437
438static int linuxring_get_framing_length(const libtrace_packet_t *packet)
439{
440        /*
441         * Need to make frame_length + capture_length = complete capture length
442         * so include alignment whitespace. So reverse calculate from packet.
443         */
444        return (char *)packet->payload - (char *)packet->buffer;
445}
446
447static size_t linuxring_set_capture_length(libtrace_packet_t *packet,
448                                           size_t size)
449{
450        assert(packet);
451        if (size > trace_get_capture_length(packet)) {
452                /* We should avoid making a packet larger */
453                return trace_get_capture_length(packet);
454        }
455
456        /* Reset the cached capture length */
457        packet->capture_length = -1;
458
459        TO_TP_HDR2(packet->buffer)->tp_snaplen = size;
460
461        return trace_get_capture_length(packet);
462}
463
464static int linuxring_prepare_packet(libtrace_t *libtrace UNUSED,
465                                    libtrace_packet_t *packet, void *buffer,
466                                    libtrace_rt_types_t rt_type, uint32_t flags)
467{
468        if (packet->buffer != buffer &&
469            packet->buf_control == TRACE_CTRL_PACKET) {
470                free(packet->buffer);
471        }
472
473        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
474                packet->buf_control = TRACE_CTRL_PACKET;
475        else
476                packet->buf_control = TRACE_CTRL_EXTERNAL;
477
478
479        packet->buffer = buffer;
480        packet->header = buffer;
481        packet->payload = (char *)buffer +
482                TP_TRACE_START(TO_TP_HDR2(packet->header)->tp_mac,
483                               TO_TP_HDR2(packet->header)->tp_net,
484                               TPACKET2_HDRLEN);
485        packet->type = rt_type;
486
487        return 0;
488}
489
490#ifdef HAVE_NETPACKET_PACKET_H
491#define LIBTRACE_MIN(a,b) ((a)<(b) ? (a) : (b))
492/* We use TP_STATUS_LIBTRACE to ensure we don't loop back on ourself
493 * and read the same packet twice if an old packet has not yet been freed */
494#define TP_STATUS_LIBTRACE 0xFFFFFFFF
495
496inline static int linuxring_read_stream(libtrace_t *libtrace,
497                                        libtrace_packet_t *packet,
498                                        struct linux_per_stream_t *stream,
499                                        libtrace_message_queue_t *queue) {
500
501        struct tpacket2_hdr *header;
502        int ret;
503        unsigned int snaplen;
504        struct pollfd pollset[2];
505
506        packet->buf_control = TRACE_CTRL_EXTERNAL;
507        packet->type = TRACE_RT_DATA_LINUX_RING;
508       
509        /* Fetch the current frame */
510        header = GET_CURRENT_BUFFER(stream);
511        assert((((unsigned long) header) & (pagesize - 1)) == 0);
512
513        /* TP_STATUS_USER means that we can use the frame.
514         * When a slot does not have this flag set, the frame is not
515         * ready for consumption.
516         */
517        while (!(header->tp_status & TP_STATUS_USER) ||
518               header->tp_status == TP_STATUS_LIBTRACE) {
519                if ((ret=is_halted(libtrace)) != -1)
520                        return ret;
521                pollset[0].fd = stream->fd;
522                pollset[0].events = POLLIN;
523                pollset[0].revents = 0;
524                if (queue) {
525                        pollset[1].fd = libtrace_message_queue_get_fd(queue);
526                        pollset[1].events = POLLIN;
527                        pollset[1].revents = 0;
528                }
529                /* Wait for more data or a message */
530                ret = poll(pollset, (queue ? 2 : 1), 500);
531                if (ret > 0) {
532                        if (pollset[0].revents == POLLIN)
533                                continue;
534                        else if (queue && pollset[1].revents == POLLIN)
535                                return READ_MESSAGE;
536                        else if (queue && pollset[1].revents) {
537                                /* Internal error */
538                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
539                                              "Message queue error %d poll()",
540                                              pollset[1].revents);
541                                return READ_ERROR;
542                        } else {
543                                /* Try get the error from the socket */
544                                int err = ENETDOWN;
545                                socklen_t len = sizeof(err);
546                                getsockopt(stream->fd, SOL_SOCKET, SO_ERROR,
547                                           &err, &len);
548                                trace_set_err(libtrace, err,
549                                              "Socket error revents=%d poll()",
550                                              pollset[0].revents);
551                                return READ_ERROR;
552                        }
553                } else if (ret < 0) {
554                        if (errno != EINTR) {
555                                trace_set_err(libtrace,errno,"poll()");
556                                return -1;
557                        }
558                } else {
559                        /* Poll timed out - check if we should exit on next loop */
560                        continue;
561                }
562        }
563        packet->buffer = header;
564        packet->trace = libtrace;
565       
566        header->tp_status = TP_STATUS_LIBTRACE;
567
568        /* If a snaplen was configured, automatically truncate the packet to
569         * the desired length.
570         */
571        snaplen=LIBTRACE_MIN(
572                        (int)LIBTRACE_PACKET_BUFSIZE-(int)sizeof(*header),
573                        (int)FORMAT_DATA->snaplen);
574       
575        TO_TP_HDR2(packet->buffer)->tp_snaplen = LIBTRACE_MIN((unsigned int)snaplen, TO_TP_HDR2(packet->buffer)->tp_len);
576
577        /* Move to next buffer */
578        stream->rxring_offset++;
579        stream->rxring_offset %= stream->req.tp_frame_nr;
580
581        packet->order = (((uint64_t)TO_TP_HDR2(packet->buffer)->tp_sec) << 32)
582                        + ((((uint64_t)TO_TP_HDR2(packet->buffer)->tp_nsec)
583                        << 32) / 1000000000);
584
585        if (packet->order <= stream->last_timestamp) {
586                packet->order = stream->last_timestamp + 1;
587        }
588
589        stream->last_timestamp = packet->order;
590
591        /* We just need to get prepare_packet to set all our packet pointers
592         * appropriately */
593        if (linuxring_prepare_packet(libtrace, packet, packet->buffer,
594                                packet->type, 0))
595                return -1;
596        return  linuxring_get_framing_length(packet) + 
597                                linuxring_get_capture_length(packet);
598
599}
600
601static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
602        return linuxring_read_stream(libtrace, packet, FORMAT_DATA_FIRST, NULL);
603}
604
605#ifdef HAVE_PACKET_FANOUT
606static int linuxring_pread_packets(libtrace_t *libtrace,
607                                   libtrace_thread_t *t,
608                                   libtrace_packet_t *packets[],
609                                   UNUSED size_t nb_packets) {
610        /* For now just read one packet */
611        packets[0]->error = linuxring_read_stream(libtrace, packets[0],
612                                                  t->format_data, &t->messages);
613        if (packets[0]->error >= 1)
614                return 1;
615        else
616                return packets[0]->error;
617}
618#endif
619
620/* Non-blocking read */
621static libtrace_eventobj_t linuxring_event(libtrace_t *libtrace,
622                                           libtrace_packet_t *packet)
623{
624        struct tpacket2_hdr *header;
625        libtrace_eventobj_t event = {0,0,0.0,0};
626
627        /* We must free the old packet, otherwise select() will instantly
628         * return */
629        ring_release_frame(libtrace, packet);
630
631        /* Fetch the current frame */
632        header = GET_CURRENT_BUFFER(FORMAT_DATA_FIRST);
633        if (header->tp_status & TP_STATUS_USER &&
634            header->tp_status != TP_STATUS_LIBTRACE) {
635                /* We have a frame waiting */
636                event.size = trace_read_packet(libtrace, packet);
637                event.type = TRACE_EVENT_PACKET;
638        } else {
639                /* Ok we don't have a packet waiting */
640                event.type = TRACE_EVENT_IOWAIT;
641                event.fd = FORMAT_DATA_FIRST->fd;
642        }
643
644        return event;
645}
646
647/**
648 * Free any resources being kept for this packet, Note: libtrace
649 * will ensure all fields are zeroed correctly.
650 */
651static void linuxring_fin_packet(libtrace_packet_t *packet)
652{
653        libtrace_t *libtrace = packet->trace;
654
655        if (packet->buffer == NULL)
656                return;
657        assert(packet->trace);
658
659        /* If we own the packet (i.e. it's not a copy), we need to free it */
660        if (packet->buf_control == TRACE_CTRL_EXTERNAL) {
661                /* If we don't have a ring its already been destroyed */
662                if (FORMAT_DATA_FIRST->rx_ring != MAP_FAILED)
663                        ring_release_frame(packet->trace, packet);
664                else
665                        packet->buffer = NULL;
666        }
667}
668
669static int linuxring_write_packet(libtrace_out_t *libtrace,
670                                  libtrace_packet_t *packet)
671{
672        struct tpacket2_hdr *header;
673        struct pollfd pollset;
674        struct socket_addr;
675        int ret;
676        unsigned max_size;
677        void * off;
678
679        if (trace_get_link_type(packet) == TRACE_TYPE_NONDATA)
680                return 0;
681
682        max_size = FORMAT_DATA_OUT->req.tp_frame_size -
683                TPACKET2_HDRLEN + sizeof(struct sockaddr_ll);
684
685        header = (void *)FORMAT_DATA_OUT->tx_ring +
686                (FORMAT_DATA_OUT->txring_offset *
687                 FORMAT_DATA_OUT->req.tp_frame_size);
688
689        while(header->tp_status != TP_STATUS_AVAILABLE) {
690                /* if none available: wait on more data */
691                pollset.fd = FORMAT_DATA_OUT->fd;
692                pollset.events = POLLOUT;
693                pollset.revents = 0;
694                ret = poll(&pollset, 1, 1000);
695                if (ret < 0 && errno != EINTR) {
696                        perror("poll");
697                        return -1;
698                }
699                if(ret == 0) {
700                        /* Timeout something has gone wrong - maybe the queue is
701                         * to large so try issue another send command
702                         */
703                        ret = sendto(FORMAT_DATA_OUT->fd,
704                                     NULL,
705                                     0,
706                                     0,
707                                     (void *)&FORMAT_DATA_OUT->sock_hdr,
708                                     sizeof(FORMAT_DATA_OUT->sock_hdr));
709                        if (ret < 0) {
710                                trace_set_err_out(libtrace, errno,
711                                                  "sendto after timeout "
712                                                  "failed");
713                                return -1;
714                        }
715                }
716        }
717
718        header->tp_len = trace_get_capture_length(packet);
719
720        /* We cannot write the whole packet so just write part of it */
721        if (header->tp_len > max_size)
722                header->tp_len = max_size;
723
724        /* Fill packet - no sockaddr_ll in header when writing to the TX_RING */
725        off = ((void *)header) + (TPACKET2_HDRLEN - sizeof(struct sockaddr_ll));
726        memcpy(off, (char *)packet->payload, header->tp_len);
727
728        /* 'Send it' and increase ring pointer to the next frame */
729        header->tp_status = TP_STATUS_SEND_REQUEST;
730        FORMAT_DATA_OUT->txring_offset = (FORMAT_DATA_OUT->txring_offset + 1) %
731                FORMAT_DATA_OUT->req.tp_frame_nr;
732
733        /* Notify kernel there are frames to send */
734        FORMAT_DATA_OUT->queue ++;
735        FORMAT_DATA_OUT->queue %= TX_MAX_QUEUE;
736        if(FORMAT_DATA_OUT->queue == 0){
737                ret = sendto(FORMAT_DATA_OUT->fd,
738                                NULL,
739                                0,
740                                MSG_DONTWAIT,
741                                (void *)&FORMAT_DATA_OUT->sock_hdr,
742                                sizeof(FORMAT_DATA_OUT->sock_hdr));
743                if (ret < 0) {
744                        trace_set_err_out(libtrace, errno, "sendto failed");
745                        return -1;
746                }
747        }
748        return header->tp_len;
749
750}
751
752static void linuxring_help(void)
753{
754        printf("linuxring format module: $Revision: 1793 $\n");
755        printf("Supported input URIs:\n");
756        printf("\tring:eth0\n");
757        printf("\n");
758        printf("Supported output URIs:\n");
759        printf("\tring:eth0\n");
760        printf("\n");
761        return;
762}
763
764static struct libtrace_format_t linuxring = {
765        "ring",
766        "$Id$",
767        TRACE_FORMAT_LINUX_RING,
768        linuxcommon_probe_filename,     /* probe filename */
769        NULL,                           /* probe magic */
770        linuxcommon_init_input,         /* init_input */
771        linuxcommon_config_input,       /* config_input */
772        linuxring_start_input,          /* start_input */
773        linuxcommon_pause_input,        /* pause_input */
774        linuxcommon_init_output,        /* init_output */
775        NULL,                           /* config_output */
776        linuxring_start_output,         /* start_ouput */
777        linuxring_fin_input,            /* fin_input */
778        linuxring_fin_output,           /* fin_output */
779        linuxring_read_packet,          /* read_packet */
780        linuxring_prepare_packet,       /* prepare_packet */
781        linuxring_fin_packet,           /* fin_packet */
782        linuxring_write_packet,         /* write_packet */
783        NULL,                           /* flush_output */
784        linuxring_get_link_type,        /* get_link_type */
785        linuxring_get_direction,        /* get_direction */
786        linuxring_set_direction,        /* set_direction */
787        NULL,                           /* get_erf_timestamp */
788        linuxring_get_timeval,          /* get_timeval */
789        linuxring_get_timespec,         /* get_timespec */
790        NULL,                           /* get_seconds */
791        NULL,                           /* seek_erf */
792        NULL,                           /* seek_timeval */
793        NULL,                           /* seek_seconds */
794        linuxring_get_capture_length,   /* get_capture_length */
795        linuxring_get_wire_length,      /* get_wire_length */
796        linuxring_get_framing_length,   /* get_framing_length */
797        linuxring_set_capture_length,   /* set_capture_length */
798        NULL,                           /* get_received_packets */
799        NULL,                           /* get_filtered_packets */
800        NULL,                           /* get_dropped_packets */
801        linuxcommon_get_statistics,     /* get_statistics */
802        linuxcommon_get_fd,             /* get_fd */
803        linuxring_event,                /* trace_event */
804        linuxring_help,                 /* help */
805        NULL,                           /* next pointer */
806#ifdef HAVE_PACKET_FANOUT
807        {true, -1},                     /* Live, no thread limit */
808        linuxring_pstart_input,         /* pstart_input */
809        linuxring_pread_packets,        /* pread_packets */
810        linuxcommon_pause_input,        /* ppause */
811        linuxcommon_fin_input,          /* p_fin */
812        linuxcommon_pregister_thread,   /* register thread */
813        NULL,                           /* unregister thread */
814        NULL                            /* get thread stats */
815#else
816        NON_PARALLEL(true)
817#endif
818};
819#else /* HAVE_NETPACKET_PACKET_H */
820
821static void linuxring_help(void)
822{
823        printf("linuxring format module: $Revision: 1793 $\n");
824        printf("Not supported on this host\n");
825}
826
827static struct libtrace_format_t linuxring = {
828        "ring",
829        "$Id$",
830        TRACE_FORMAT_LINUX_RING,
831        NULL,                           /* probe filename */
832        NULL,                           /* probe magic */
833        NULL,                           /* init_input */
834        NULL,                           /* config_input */
835        NULL,                           /* start_input */
836        NULL,                           /* pause_input */
837        NULL,                           /* init_output */
838        NULL,                           /* config_output */
839        NULL,                           /* start_ouput */
840        NULL,                           /* fin_input */
841        NULL,                           /* fin_output */
842        NULL,                           /* read_packet */
843        linuxring_prepare_packet,       /* prepare_packet */
844        NULL,                           /* fin_packet */
845        NULL,                           /* write_packet */
846        NULL,                           /* flush_output */
847        linuxring_get_link_type,        /* get_link_type */
848        linuxring_get_direction,        /* get_direction */
849        linuxring_set_direction,        /* set_direction */
850        NULL,                           /* get_erf_timestamp */
851        linuxring_get_timeval,          /* get_timeval */
852        linuxring_get_timespec,         /* get_timespec */
853        NULL,                           /* get_seconds */
854        NULL,                           /* seek_erf */
855        NULL,                           /* seek_timeval */
856        NULL,                           /* seek_seconds */
857        linuxring_get_capture_length,   /* get_capture_length */
858        linuxring_get_wire_length,      /* get_wire_length */
859        linuxring_get_framing_length,   /* get_framing_length */
860        linuxring_set_capture_length,   /* set_capture_length */
861        NULL,                           /* get_received_packets */
862        NULL,                           /* get_filtered_packets */
863        NULL,                           /* get_dropped_packets */
864        linuxcommon_get_statistics,     /* get_statistics */
865        NULL,                           /* get_fd */
866        NULL,                           /* trace_event */
867        linuxring_help,                 /* help */
868        NULL,                           /* next pointer */
869        NON_PARALLEL(true)
870};
871#endif /* HAVE_NETPACKET_PACKET_H */
872
873/* TODO: Figure out how to give this format preference over the linux native
874 * formate if the user only specifies an interface */
875void linuxring_constructor(void)
876{
877        pthread_mutex_init(&pagesize_mutex, NULL);
878        register_format(&linuxring);
879}
Note: See TracBrowser for help on using the repository browser.