source: lib/format_linux_ring.c @ fce4572

develop
Last change on this file since fce4572 was fce4572, checked in by Shane Alcock <salcock@…>, 22 months ago

Add more failure cases to "can_write" functions for some formats.

Specifically:

  • Avoid writing metadata packets to DPDK, ring and int
  • Make sure all formats avoid writing "content invalid" packets.
  • Add comments to remind us that erf meta <-> pcapng meta conversion might be worth adding at some point.
  • Add comment to remind us that erf meta should be writable via a DAG card (I think).
  • Property mode set to 100644
File size: 27.3 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27/* This format module deals with using the Linux Ring capture format (also
28 * known as PACKET_MMAP).
29 *
30 * Linux Ring is a LIVE capture format.
31 *
32 * This format also supports writing which will write packets out to the
33 * network as a form of packet replay. This should not be confused with the
34 * RT protocol which is intended to transfer captured packet records between
35 * RT-speaking programs.
36 */
37
38#include "config.h"
39#include "libtrace.h"
40#include "libtrace_int.h"
41#include "format_helper.h"
42#include "libtrace_arphrd.h"
43#include <stdlib.h>
44#include <errno.h>
45#include <unistd.h>
46#include <string.h>
47
48#ifdef HAVE_INTTYPES_H
49#  include <inttypes.h>
50#else
51# error "Can't find inttypes.h"
52#endif
53
54#include "format_linux_common.h"
55
56/* Get the start of the captured data. I'm not sure if tp_mac (link layer) is
57 * always guaranteed. If it's not there then just use tp_net.
58 */
59#define TP_TRACE_START(mac, net, hdrend) \
60        ((mac) > (hdrend) && (mac) < (net) ? (mac) : (net))
61
62static pthread_mutex_t pagesize_mutex;
63#ifdef HAVE_NETPACKET_PACKET_H
64/* Get current frame in the ring buffer*/
65#define GET_CURRENT_BUFFER(stream) \
66        ((void *)stream->rx_ring +                              \
67         (stream->rxring_offset *                               \
68          stream->req.tp_frame_size))
69
70/* Cached page size, the page size shouldn't be changing */
71static int pagesize = 0;
72
73static bool linuxring_can_write(libtrace_packet_t *packet) {
74        /* Get the linktype */
75        libtrace_linktype_t ltype = trace_get_link_type(packet);
76
77        if (ltype == TRACE_TYPE_CONTENT_INVALID) {
78                return false;
79        }
80        if (ltype == TRACE_TYPE_NONDATA) {
81                return false;
82        }
83        if (ltype == TRACE_TYPE_PCAPNG_META) {
84                return false;
85        }
86        if (ltype == TRACE_TYPE_ERF_META) {
87                return false;
88        }
89
90        return true;
91}
92
93/*
94 * Try figure out the best sizes for the ring buffer. Ensure that:
95 * - max(Block_size) == page_size << max_order
96 * - Frame_size == page_size << x (so that block_size%frame_size == 0)
97 *   This means that there will be no wasted space between blocks
98 * - Frame_size < block_size
99 * - Frame_size is as close as possible to LIBTRACE_PACKET_BUFSIZE, but not
100 *   bigger
101 * - Frame_nr = Block_nr * (frames per block)
102 * - CONF_RING_FRAMES is used a minimum number of frames to hold
103 * - Calculates based on max_order and buf_min
104 */
105static void calculate_buffers(struct tpacket_req * req, int fd, char * uri,
106                uint32_t max_order)
107{
108        struct ifreq ifr;
109        unsigned max_frame = LIBTRACE_PACKET_BUFSIZE;
110        pthread_mutex_lock(&pagesize_mutex);
111        if (pagesize == 0) {
112                pagesize = getpagesize();
113        }
114        pthread_mutex_unlock(&pagesize_mutex);
115
116        strcpy(ifr.ifr_name, uri);
117        /* Don't bother trying to set frame size above mtu linux will drop
118         * these anyway.
119         *
120         * Remember, that our frame also has to include a TPACKET header!
121         */
122        if (ioctl(fd, SIOCGIFMTU, (caddr_t)&ifr) >= 0)
123                max_frame = ifr.ifr_mtu + TPACKET_ALIGN(TPACKET2_HDRLEN);
124        if (max_frame > LIBTRACE_PACKET_BUFSIZE)
125                max_frame = LIBTRACE_PACKET_BUFSIZE;
126
127        /* Calculate frame size */
128        req->tp_frame_size = pagesize;
129        while (req->tp_frame_size < max_frame &&
130              req->tp_frame_size < LIBTRACE_PACKET_BUFSIZE) {
131                req->tp_frame_size <<= 1;
132        }
133        if (req->tp_frame_size > LIBTRACE_PACKET_BUFSIZE)
134                req->tp_frame_size >>= 1;
135
136        /* Calculate block size */
137        req->tp_block_size = pagesize << max_order;
138        /* If max order is too high this might become 0 */
139        if (req->tp_block_size == 0) {
140                calculate_buffers(req, fd, uri, max_order-1);
141                return;
142        }
143        do {
144                req->tp_block_size >>= 1;
145        } while ((CONF_RING_FRAMES * req->tp_frame_size) <= req->tp_block_size);
146        req->tp_block_size <<= 1;
147
148        /* Calculate number of blocks */
149        req->tp_block_nr = (CONF_RING_FRAMES * req->tp_frame_size)
150                / req->tp_block_size;
151        if((CONF_RING_FRAMES * req->tp_frame_size) % req->tp_block_size != 0)
152                req->tp_block_nr++;
153
154        /* Calculate packets such that we use all the space we have to
155         * allocated */
156        req->tp_frame_nr = req->tp_block_nr *
157                (req->tp_block_size / req->tp_frame_size);
158
159        /*
160        printf("MaxO 0x%x BS 0x%x BN 0x%x FS 0x%x FN 0x%x\n",
161                max_order,
162                req->tp_block_size,
163                req->tp_block_nr,
164                req->tp_frame_size,
165                req->tp_frame_nr);
166        */
167
168        /* In case we have some silly values*/
169        if (!req->tp_block_size) {
170                fprintf(stderr, "Unexpected value of zero for req->tp_block_size in calculate_buffers()\n");
171        }
172        if (!req->tp_block_nr) {
173                fprintf(stderr, "Unexpected value of zero for req->tp_block_nr in calculate_buffers()\n");
174        }
175        if (!req->tp_frame_size) {
176                fprintf(stderr, "Unexpected value of zero for req->tp_frame_size in calculate_buffers()\n");
177        }
178        if (!req->tp_frame_nr) {
179                fprintf(stderr, "Unexpected value of zero for req->tp_frame_nr in calculate_buffers()\n");
180        }
181        if (req->tp_block_size % req->tp_frame_size != 0) {
182                fprintf(stderr, "Unexpected value of zero for req->tp_block_size %% req->tp_frame_size in calculate_buffers()\n");
183        }
184}
185
186static inline int socket_to_packetmmap(char * uridata, int ring_type,
187                                        int fd,
188                                        struct tpacket_req * req,
189                                        char ** ring_location,
190                                        uint32_t *max_order,
191                                        char *error) {
192        int val;
193
194        /* Switch to TPACKET header version 2, we only try support v2 because
195         * v1 had problems with data type consistancy */
196        val = TPACKET_V2;
197        if (setsockopt(fd,
198                       SOL_PACKET,
199                       PACKET_VERSION,
200                       &val,
201                       sizeof(val)) == -1) {
202                strncpy(error, "TPACKET2 not supported", 2048);
203                return -1;
204        }
205
206        /* Try switch to a ring buffer. If it fails we assume the the kernel
207         * cannot allocate a block of that size, so decrease max_block and
208         * retry.
209         */
210        while(1) {
211                if (*max_order <= 0) {
212                        strncpy(error,
213                                "Cannot allocate enough memory for ring buffer",
214                                2048);
215                        return -1;
216                }
217                calculate_buffers(req, fd, uridata, *max_order);
218                if (setsockopt(fd,
219                               SOL_PACKET,
220                               ring_type,
221                               req,
222                               sizeof(struct tpacket_req)) == -1) {
223                        if(errno == ENOMEM) {
224                                (*max_order)--;
225                        } else {
226                                strncpy(error,
227                                        "Error setting the ring buffer size",
228                                        2048);
229                                return -1;
230                        }
231
232                } else break;
233        }
234
235        /* Map the ring buffer into userspace */
236        *ring_location = mmap(NULL,
237                              req->tp_block_size * req->tp_block_nr,
238                              PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
239        if(*ring_location == MAP_FAILED) {
240                strncpy(error, "Failed to map memory for ring buffer", 2048);
241                return -1;
242        }
243
244        return 0;
245}
246
247/* Release a frame back to the kernel or free() if it's a malloc'd buffer
248 */
249inline static void ring_release_frame(libtrace_t *libtrace UNUSED,
250                                      libtrace_packet_t *packet)
251{
252        /* Free the old packet */
253        if(packet->buffer == NULL)
254                return;
255
256        if(packet->buf_control == TRACE_CTRL_PACKET){
257                free(packet->buffer);
258                packet->buffer = NULL;
259        }
260
261        if(packet->buf_control == TRACE_CTRL_EXTERNAL) {
262                //struct linux_format_data_t *ftd = FORMAT_DATA;
263                /* Check it's within our buffer first - consider the pause
264                 * resume case it might have already been free'd lets hope we
265                 * get another buffer */
266                // TODO: For now let any one free anything
267                /*if(LIBTRACE_BETWEEN((char *) packet->buffer,
268                                (char *) ftd->rx_ring,
269                                ftd->rx_ring +
270                                ftd->req.tp_block_size *
271                                ftd->req.tp_block_nr)){*/
272                TO_TP_HDR2(packet->buffer)->tp_status = 0;
273                packet->buffer = NULL;
274                /*}*/
275        }
276}
277
278static inline int linuxring_start_input_stream(libtrace_t *libtrace,
279                                               struct linux_per_stream_t *stream) {
280        char error[2048];
281
282        /* Unmap any previous ring buffers associated with this stream. */
283        if (stream->rx_ring != MAP_FAILED) {
284                munmap(stream->rx_ring, stream->req.tp_block_size *
285                                stream->req.tp_block_nr);
286                stream->rx_ring = MAP_FAILED;
287                stream->rxring_offset = 0;
288        }
289
290
291        /* We set the socket up the same and then convert it to PACKET_MMAP */
292        if (linuxcommon_start_input_stream(libtrace, stream) < 0)
293                return -1;
294
295        strncpy(error, "No known error", 2048);
296
297        /* Make it a packetmmap */
298        if(socket_to_packetmmap(libtrace->uridata, PACKET_RX_RING,
299                                stream->fd,
300                                &stream->req,
301                                &stream->rx_ring,
302                                &FORMAT_DATA->max_order,
303                                error) != 0) {
304                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
305                              "Initialisation of packet MMAP failed: %s",
306                              error);
307                linuxcommon_close_input_stream(libtrace, stream);
308                return -1;
309        }
310
311        return 0;
312}
313
314static int linuxring_fin_input(libtrace_t *libtrace) {
315        size_t i;
316
317        if (libtrace->format_data) {
318                for (i = 0; i < libtrace_list_get_size(FORMAT_DATA->per_stream); ++i) {
319                        struct linux_per_stream_t *stream;
320                        stream = libtrace_list_get_index(
321                                FORMAT_DATA->per_stream, i)->data;
322                        if (stream->rx_ring != MAP_FAILED) {
323                                munmap(stream->rx_ring,
324                                                stream->req.tp_block_size *
325                                                stream->req.tp_block_nr);
326                        }
327                }
328
329                if (FORMAT_DATA->filter != NULL)
330                        trace_destroy_filter(FORMAT_DATA->filter);
331
332                if (FORMAT_DATA->per_stream)
333                        libtrace_list_deinit(FORMAT_DATA->per_stream);
334
335                free(libtrace->format_data);
336        }
337
338        return 0;
339}
340
341
342static int linuxring_start_input(libtrace_t *libtrace)
343{
344        int ret = linuxring_start_input_stream(libtrace, FORMAT_DATA_FIRST);
345        return ret;
346}
347
348#ifdef HAVE_PACKET_FANOUT
349static int linuxring_pstart_input(libtrace_t *libtrace) {
350        return linuxcommon_pstart_input(libtrace, linuxring_start_input_stream);
351}
352#endif
353
354static int linuxring_start_output(libtrace_out_t *libtrace)
355{
356        char error[2048];
357        FORMAT_DATA_OUT->fd = socket(PF_PACKET, SOCK_RAW, 0);
358        if (FORMAT_DATA_OUT->fd==-1) {
359                free(FORMAT_DATA_OUT);
360                trace_set_err_out(libtrace, errno, "Failed to create raw socket");
361                return -1;
362        }
363
364        /* Make it a packetmmap */
365        if(socket_to_packetmmap(libtrace->uridata, PACKET_TX_RING,
366                                FORMAT_DATA_OUT->fd,
367                                &FORMAT_DATA_OUT->req,
368                                &FORMAT_DATA_OUT->tx_ring,
369                                &FORMAT_DATA_OUT->max_order,
370                                error) != 0) {
371                trace_set_err_out(libtrace, TRACE_ERR_INIT_FAILED,
372                                  "Initialisation of packet MMAP failed: %s",
373                                  error);
374                close(FORMAT_DATA_OUT->fd);
375                free(FORMAT_DATA_OUT);
376                libtrace->format_data = NULL;
377                return -1;
378        }
379
380        FORMAT_DATA_OUT->sock_hdr.sll_family = AF_PACKET;
381        FORMAT_DATA_OUT->sock_hdr.sll_protocol = 0;
382        FORMAT_DATA_OUT->sock_hdr.sll_ifindex =
383                if_nametoindex(libtrace->uridata);
384        FORMAT_DATA_OUT->sock_hdr.sll_hatype = 0;
385        FORMAT_DATA_OUT->sock_hdr.sll_pkttype = 0;
386        FORMAT_DATA_OUT->sock_hdr.sll_halen = 0;
387        FORMAT_DATA_OUT->queue = 0;
388
389        return 0;
390}
391
392static int linuxring_fin_output(libtrace_out_t *libtrace)
393{
394        /* Make sure any remaining frames get sent */
395        sendto(FORMAT_DATA_OUT->fd,
396               NULL,
397               0,
398               0,
399               (void *) &FORMAT_DATA_OUT->sock_hdr,
400               sizeof(FORMAT_DATA_OUT->sock_hdr));
401
402        /* Unmap our data area */
403        munmap(FORMAT_DATA_OUT->tx_ring,
404               FORMAT_DATA_OUT->req.tp_block_size *
405               FORMAT_DATA_OUT->req.tp_block_nr);
406
407        /* Free the socket */
408        close(FORMAT_DATA_OUT->fd);
409        FORMAT_DATA_OUT->fd=-1;
410        free(libtrace->format_data);
411        return 0;
412}
413#endif /* HAVE_NETPACKET_PACKET_H */
414
415static libtrace_linktype_t
416linuxring_get_link_type(const struct libtrace_packet_t *packet)
417{
418        uint16_t linktype = GET_SOCKADDR_HDR(packet->buffer)->sll_hatype;
419        return linuxcommon_get_link_type(linktype);
420}
421
422static libtrace_direction_t
423linuxring_get_direction(const struct libtrace_packet_t *packet) {
424        return linuxcommon_get_direction(GET_SOCKADDR_HDR(packet->buffer)->
425                                         sll_pkttype);
426}
427
428static libtrace_direction_t
429linuxring_set_direction(libtrace_packet_t *packet,
430                        libtrace_direction_t direction) {
431        return linuxcommon_set_direction(GET_SOCKADDR_HDR(packet->buffer), direction);
432}
433
434static struct timeval linuxring_get_timeval(const libtrace_packet_t *packet)
435{
436        struct timeval tv;
437        tv.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
438        tv.tv_usec = TO_TP_HDR2(packet->buffer)->tp_nsec / 1000;
439        return tv;
440}
441
442static struct timespec linuxring_get_timespec(const libtrace_packet_t *packet)
443{
444        struct timespec ts;
445        ts.tv_sec = TO_TP_HDR2(packet->buffer)->tp_sec;
446        ts.tv_nsec = TO_TP_HDR2(packet->buffer)->tp_nsec;
447        return ts;
448}
449
450static int linuxring_get_capture_length(const libtrace_packet_t *packet)
451{
452        return TO_TP_HDR2(packet->buffer)->tp_snaplen;
453}
454
455static int linuxring_get_wire_length(const libtrace_packet_t *packet)
456{
457        int wirelen = TO_TP_HDR2(packet->buffer)->tp_len;
458
459        /* Include the missing FCS */
460        if (trace_get_link_type(packet) == TRACE_TYPE_ETH)
461                wirelen += 4;
462
463        return wirelen;
464}
465
466static int linuxring_get_framing_length(const libtrace_packet_t *packet)
467{
468        /*
469         * Need to make frame_length + capture_length = complete capture length
470         * so include alignment whitespace. So reverse calculate from packet.
471         */
472        return (char *)packet->payload - (char *)packet->buffer;
473}
474
475static size_t linuxring_set_capture_length(libtrace_packet_t *packet,
476                                           size_t size)
477{
478        if (!packet) {
479                fprintf(stderr, "NULL packet passed into linuxring_set_capture_length()\n");
480                /* Return -1 on error? */
481                return ~0U;
482        }
483        if (size > trace_get_capture_length(packet)) {
484                /* We should avoid making a packet larger */
485                return trace_get_capture_length(packet);
486        }
487
488        /* Reset the cached capture length */
489        packet->cached.capture_length = -1;
490
491        TO_TP_HDR2(packet->buffer)->tp_snaplen = size;
492
493        return trace_get_capture_length(packet);
494}
495
496static int linuxring_prepare_packet(libtrace_t *libtrace UNUSED,
497                                    libtrace_packet_t *packet, void *buffer,
498                                    libtrace_rt_types_t rt_type, uint32_t flags)
499{
500        if (packet->buffer != buffer &&
501            packet->buf_control == TRACE_CTRL_PACKET) {
502                free(packet->buffer);
503        }
504
505        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER)
506                packet->buf_control = TRACE_CTRL_PACKET;
507        else
508                packet->buf_control = TRACE_CTRL_EXTERNAL;
509
510
511        packet->buffer = buffer;
512        packet->header = buffer;
513        packet->payload = (char *)buffer +
514                TP_TRACE_START(TO_TP_HDR2(packet->header)->tp_mac,
515                               TO_TP_HDR2(packet->header)->tp_net,
516                               TPACKET2_HDRLEN);
517        packet->type = rt_type;
518
519        return 0;
520}
521
522#ifdef HAVE_NETPACKET_PACKET_H
523#define LIBTRACE_MIN(a,b) ((a)<(b) ? (a) : (b))
524/* We use TP_STATUS_LIBTRACE to ensure we don't loop back on ourself
525 * and read the same packet twice if an old packet has not yet been freed */
526#define TP_STATUS_LIBTRACE 0xFFFFFFFF
527
528inline static int linuxring_read_stream(libtrace_t *libtrace,
529                                        libtrace_packet_t *packet,
530                                        struct linux_per_stream_t *stream,
531                                        libtrace_message_queue_t *queue,
532                                        uint8_t block) {
533
534        struct tpacket2_hdr *header;
535        int ret;
536        unsigned int snaplen;
537        struct pollfd pollset[2];
538
539        packet->buf_control = TRACE_CTRL_EXTERNAL;
540        packet->type = TRACE_RT_DATA_LINUX_RING;
541
542        /* Fetch the current frame */
543        header = GET_CURRENT_BUFFER(stream);
544        if ((((unsigned long) header) & (pagesize - 1)) != 0) {
545                trace_set_err(libtrace, TRACE_ERR_BAD_IO, "Linux ring packet is not correctly "
546                        "aligned to page size in linux_read_string()");
547                return -1;
548        }
549
550        /* TP_STATUS_USER means that we can use the frame.
551         * When a slot does not have this flag set, the frame is not
552         * ready for consumption.
553         */
554        while (!(header->tp_status & TP_STATUS_USER) ||
555                        header->tp_status == TP_STATUS_LIBTRACE) {
556                if ((ret=is_halted(libtrace)) != -1)
557                        return ret;
558                if (!block) {
559                        return 0;
560                }
561
562                pollset[0].fd = stream->fd;
563                pollset[0].events = POLLIN;
564                pollset[0].revents = 0;
565                if (queue) {
566                        pollset[1].fd = libtrace_message_queue_get_fd(queue);
567                        pollset[1].events = POLLIN;
568                        pollset[1].revents = 0;
569                }
570                /* Wait for more data or a message */
571                ret = poll(pollset, (queue ? 2 : 1), 500);
572                if (ret > 0) {
573                        if (pollset[0].revents == POLLIN)
574                                continue;
575                        else if (queue && pollset[1].revents == POLLIN)
576                                return READ_MESSAGE;
577                        else if (queue && pollset[1].revents) {
578                                /* Internal error */
579                                trace_set_err(libtrace,TRACE_ERR_BAD_STATE,
580                                              "Message queue error %d poll()",
581                                              pollset[1].revents);
582                                return READ_ERROR;
583                        } else {
584                                /* Try get the error from the socket */
585                                int err = ENETDOWN;
586                                socklen_t len = sizeof(err);
587                                getsockopt(stream->fd, SOL_SOCKET, SO_ERROR,
588                                           &err, &len);
589                                trace_set_err(libtrace, err,
590                                              "Socket error revents=%d poll()",
591                                              pollset[0].revents);
592                                return READ_ERROR;
593                        }
594                } else if (ret < 0) {
595                        if (errno != EINTR) {
596                                trace_set_err(libtrace,errno,"poll()");
597                                return -1;
598                        }
599                } else {
600                        /* Poll timed out - check if we should exit on next loop */
601                        continue;
602                }
603        }
604        packet->buffer = header;
605        packet->trace = libtrace;
606       
607        header->tp_status = TP_STATUS_LIBTRACE;
608
609        /* If a snaplen was configured, automatically truncate the packet to
610         * the desired length.
611         */
612        snaplen=LIBTRACE_MIN(
613                        (int)LIBTRACE_PACKET_BUFSIZE-(int)sizeof(*header),
614                        (int)FORMAT_DATA->snaplen);
615       
616        TO_TP_HDR2(packet->buffer)->tp_snaplen = LIBTRACE_MIN((unsigned int)snaplen, TO_TP_HDR2(packet->buffer)->tp_len);
617
618        /* Move to next buffer */
619        stream->rxring_offset++;
620        stream->rxring_offset %= stream->req.tp_frame_nr;
621
622        packet->order = (((uint64_t)TO_TP_HDR2(packet->buffer)->tp_sec) << 32)
623                        + ((((uint64_t)TO_TP_HDR2(packet->buffer)->tp_nsec)
624                        << 32) / 1000000000);
625
626        if (packet->order <= stream->last_timestamp) {
627                packet->order = stream->last_timestamp + 1;
628        }
629
630        stream->last_timestamp = packet->order;
631
632        /* We just need to get prepare_packet to set all our packet pointers
633         * appropriately */
634        if (linuxring_prepare_packet(libtrace, packet, packet->buffer,
635                                packet->type, 0))
636                return -1;
637        return  linuxring_get_framing_length(packet) + 
638                                linuxring_get_capture_length(packet);
639
640}
641
642static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
643        return linuxring_read_stream(libtrace, packet, FORMAT_DATA_FIRST, NULL, 1);
644}
645
646#ifdef HAVE_PACKET_FANOUT
647static int linuxring_pread_packets(libtrace_t *libtrace,
648                                   libtrace_thread_t *t,
649                                   libtrace_packet_t *packets[],
650                                   size_t nb_packets) {
651        size_t i;
652        int ret;
653
654        for (i = 0; i < nb_packets; i++) {
655                ret = linuxring_read_stream(libtrace, packets[i],
656                                t->format_data, &t->messages, i == 0 ? 1 : 0);
657                packets[i]->error = ret;
658                if (ret < 0) {
659                        return ret;
660                }
661
662                if (ret == 0) {
663                        if (is_halted(libtrace) == READ_EOF) {
664                                return READ_EOF;
665                        }
666                        return i;
667                }
668        }
669
670        return nb_packets;
671}
672#endif
673
674/* Non-blocking read */
675static libtrace_eventobj_t linuxring_event(libtrace_t *libtrace,
676                                           libtrace_packet_t *packet)
677{
678        struct tpacket2_hdr *header;
679        libtrace_eventobj_t event = {0,0,0.0,0};
680
681        /* We must free the old packet, otherwise select() will instantly
682         * return */
683        ring_release_frame(libtrace, packet);
684
685        /* Fetch the current frame */
686        header = GET_CURRENT_BUFFER(FORMAT_DATA_FIRST);
687        if (header->tp_status & TP_STATUS_USER &&
688            header->tp_status != TP_STATUS_LIBTRACE) {
689                /* We have a frame waiting */
690                event.size = trace_read_packet(libtrace, packet);
691                event.type = TRACE_EVENT_PACKET;
692        } else {
693                /* Ok we don't have a packet waiting */
694                event.type = TRACE_EVENT_IOWAIT;
695                event.fd = FORMAT_DATA_FIRST->fd;
696        }
697
698        return event;
699}
700
701/**
702 * Free any resources being kept for this packet, Note: libtrace
703 * will ensure all fields are zeroed correctly.
704 */
705static void linuxring_fin_packet(libtrace_packet_t *packet)
706{
707        libtrace_t *libtrace = packet->trace;
708
709        if (packet->buffer == NULL)
710                return;
711        if (!packet->trace) {
712                fprintf(stderr, "Linux ring packet is not attached to a valid "
713                        "trace, Unable to release it, in linuxring_fin_packet()\n");
714                return;
715        }
716
717        /* If we own the packet (i.e. it's not a copy), we need to free it */
718        if (packet->buf_control == TRACE_CTRL_EXTERNAL) {
719                /* If we don't have a ring its already been destroyed */
720                if (FORMAT_DATA_FIRST->rx_ring != MAP_FAILED)
721                        ring_release_frame(packet->trace, packet);
722                else
723                        packet->buffer = NULL;
724        }
725}
726
727static int linuxring_write_packet(libtrace_out_t *libtrace,
728                                  libtrace_packet_t *packet)
729{
730        /* Check linuxring can write this type of packet */
731        if (!linuxring_can_write(packet)) {
732                return 0;
733        }
734
735        struct tpacket2_hdr *header;
736        struct pollfd pollset;
737        struct socket_addr;
738        int ret;
739        unsigned max_size;
740        void * off;
741
742        max_size = FORMAT_DATA_OUT->req.tp_frame_size -
743                TPACKET2_HDRLEN + sizeof(struct sockaddr_ll);
744
745        header = (void *)FORMAT_DATA_OUT->tx_ring +
746                (FORMAT_DATA_OUT->txring_offset *
747                 FORMAT_DATA_OUT->req.tp_frame_size);
748
749        while(header->tp_status != TP_STATUS_AVAILABLE) {
750                /* if none available: wait on more data */
751                pollset.fd = FORMAT_DATA_OUT->fd;
752                pollset.events = POLLOUT;
753                pollset.revents = 0;
754                ret = poll(&pollset, 1, 1000);
755                if (ret < 0 && errno != EINTR) {
756                        perror("poll");
757                        return -1;
758                }
759                if(ret == 0) {
760                        /* Timeout something has gone wrong - maybe the queue is
761                         * to large so try issue another send command
762                         */
763                        ret = sendto(FORMAT_DATA_OUT->fd,
764                                     NULL,
765                                     0,
766                                     0,
767                                     (void *)&FORMAT_DATA_OUT->sock_hdr,
768                                     sizeof(FORMAT_DATA_OUT->sock_hdr));
769                        if (ret < 0) {
770                                trace_set_err_out(libtrace, errno,
771                                                  "sendto after timeout "
772                                                  "failed");
773                                return -1;
774                        }
775                }
776        }
777
778        header->tp_len = trace_get_capture_length(packet);
779
780        /* We cannot write the whole packet so just write part of it */
781        if (header->tp_len > max_size)
782                header->tp_len = max_size;
783
784        /* Fill packet - no sockaddr_ll in header when writing to the TX_RING */
785        off = ((void *)header) + (TPACKET2_HDRLEN - sizeof(struct sockaddr_ll));
786        memcpy(off, (char *)packet->payload, header->tp_len);
787
788        /* 'Send it' and increase ring pointer to the next frame */
789        header->tp_status = TP_STATUS_SEND_REQUEST;
790        FORMAT_DATA_OUT->txring_offset = (FORMAT_DATA_OUT->txring_offset + 1) %
791                FORMAT_DATA_OUT->req.tp_frame_nr;
792
793        /* Notify kernel there are frames to send */
794        FORMAT_DATA_OUT->queue ++;
795        FORMAT_DATA_OUT->queue %= TX_MAX_QUEUE;
796        if(FORMAT_DATA_OUT->queue == 0){
797                ret = sendto(FORMAT_DATA_OUT->fd,
798                                NULL,
799                                0,
800                                MSG_DONTWAIT,
801                                (void *)&FORMAT_DATA_OUT->sock_hdr,
802                                sizeof(FORMAT_DATA_OUT->sock_hdr));
803                if (ret < 0) {
804                        trace_set_err_out(libtrace, errno, "sendto failed");
805                        return -1;
806                }
807        }
808        return header->tp_len;
809
810}
811
812static void linuxring_help(void)
813{
814        printf("linuxring format module: $Revision: 1793 $\n");
815        printf("Supported input URIs:\n");
816        printf("\tring:eth0\n");
817        printf("\n");
818        printf("Supported output URIs:\n");
819        printf("\tring:eth0\n");
820        printf("\n");
821        return;
822}
823
824static struct libtrace_format_t linuxring = {
825        "ring",
826        "$Id$",
827        TRACE_FORMAT_LINUX_RING,
828        linuxcommon_probe_filename,     /* probe filename */
829        NULL,                           /* probe magic */
830        linuxcommon_init_input,         /* init_input */
831        linuxcommon_config_input,       /* config_input */
832        linuxring_start_input,          /* start_input */
833        linuxcommon_pause_input,        /* pause_input */
834        linuxcommon_init_output,        /* init_output */
835        NULL,                           /* config_output */
836        linuxring_start_output,         /* start_ouput */
837        linuxring_fin_input,            /* fin_input */
838        linuxring_fin_output,           /* fin_output */
839        linuxring_read_packet,          /* read_packet */
840        linuxring_prepare_packet,       /* prepare_packet */
841        linuxring_fin_packet,           /* fin_packet */
842        linuxring_write_packet,         /* write_packet */
843        NULL,                           /* flush_output */
844        linuxring_get_link_type,        /* get_link_type */
845        linuxring_get_direction,        /* get_direction */
846        linuxring_set_direction,        /* set_direction */
847        NULL,                           /* get_erf_timestamp */
848        linuxring_get_timeval,          /* get_timeval */
849        linuxring_get_timespec,         /* get_timespec */
850        NULL,                           /* get_seconds */
851        NULL,                           /* seek_erf */
852        NULL,                           /* seek_timeval */
853        NULL,                           /* seek_seconds */
854        linuxring_get_capture_length,   /* get_capture_length */
855        linuxring_get_wire_length,      /* get_wire_length */
856        linuxring_get_framing_length,   /* get_framing_length */
857        linuxring_set_capture_length,   /* set_capture_length */
858        NULL,                           /* get_received_packets */
859        NULL,                           /* get_filtered_packets */
860        NULL,                           /* get_dropped_packets */
861        linuxcommon_get_statistics,     /* get_statistics */
862        linuxcommon_get_fd,             /* get_fd */
863        linuxring_event,                /* trace_event */
864        linuxring_help,                 /* help */
865        NULL,                           /* next pointer */
866#ifdef HAVE_PACKET_FANOUT
867        {true, -1},                     /* Live, no thread limit */
868        linuxring_pstart_input,         /* pstart_input */
869        linuxring_pread_packets,        /* pread_packets */
870        linuxcommon_pause_input,        /* ppause */
871        linuxcommon_fin_input,          /* p_fin */
872        linuxcommon_pregister_thread,   /* register thread */
873        NULL,                           /* unregister thread */
874        NULL                            /* get thread stats */
875#else
876        NON_PARALLEL(true)
877#endif
878};
879#else /* HAVE_NETPACKET_PACKET_H */
880
881static void linuxring_help(void)
882{
883        printf("linuxring format module: $Revision: 1793 $\n");
884        printf("Not supported on this host\n");
885}
886
887static struct libtrace_format_t linuxring = {
888        "ring",
889        "$Id$",
890        TRACE_FORMAT_LINUX_RING,
891        NULL,                           /* probe filename */
892        NULL,                           /* probe magic */
893        NULL,                           /* init_input */
894        NULL,                           /* config_input */
895        NULL,                           /* start_input */
896        NULL,                           /* pause_input */
897        NULL,                           /* init_output */
898        NULL,                           /* config_output */
899        NULL,                           /* start_ouput */
900        NULL,                           /* fin_input */
901        NULL,                           /* fin_output */
902        NULL,                           /* read_packet */
903        linuxring_prepare_packet,       /* prepare_packet */
904        NULL,                           /* fin_packet */
905        NULL,                           /* write_packet */
906        NULL,                           /* flush_output */
907        linuxring_get_link_type,        /* get_link_type */
908        linuxring_get_direction,        /* get_direction */
909        linuxring_set_direction,        /* set_direction */
910        NULL,                           /* get_erf_timestamp */
911        linuxring_get_timeval,          /* get_timeval */
912        linuxring_get_timespec,         /* get_timespec */
913        NULL,                           /* get_seconds */
914        NULL,                           /* seek_erf */
915        NULL,                           /* seek_timeval */
916        NULL,                           /* seek_seconds */
917        linuxring_get_capture_length,   /* get_capture_length */
918        linuxring_get_wire_length,      /* get_wire_length */
919        linuxring_get_framing_length,   /* get_framing_length */
920        linuxring_set_capture_length,   /* set_capture_length */
921        NULL,                           /* get_received_packets */
922        NULL,                           /* get_filtered_packets */
923        NULL,                           /* get_dropped_packets */
924        linuxcommon_get_statistics,     /* get_statistics */
925        NULL,                           /* get_fd */
926        NULL,                           /* trace_event */
927        linuxring_help,                 /* help */
928        NULL,                           /* next pointer */
929        NON_PARALLEL(true)
930};
931#endif /* HAVE_NETPACKET_PACKET_H */
932
933/* TODO: Figure out how to give this format preference over the linux native
934 * formate if the user only specifies an interface */
935void linuxring_constructor(void)
936{
937        pthread_mutex_init(&pagesize_mutex, NULL);
938        register_format(&linuxring);
939}
Note: See TracBrowser for help on using the repository browser.