source: lib/format_pfring.c @ e2cd7d8

pfring
Last change on this file since e2cd7d8 was e2cd7d8, checked in by Shane Alcock <salcock@…>, 7 years ago

More pfring improvements

Only byteswap if the byte-ordering of the pfring header doesn't
match the byte ordering of the host looking at it. It's a bit of a pain,
but we were losing quite a bit of performance by always byteswapping to
a standard byte order upon reading the packet.

Turns out the pfring API has a get_selectable_fd function so our event
API doesn't need to poll if no packets are available.

  • Property mode set to 100644
File size: 20.6 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Author: Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 */
29
30#define _GNU_SOURCE
31#include "config.h"
32#include "libtrace.h"
33#include "libtrace_int.h"
34#include "format_helper.h"
35#include "data-struct/linked_list.h"
36
37#include <stdlib.h>
38#include <assert.h>
39#include <unistd.h>
40#include <string.h>
41
42#if HAVE_LIBNUMA
43#include <numa.h>
44#endif
45
46#include <pthread.h>
47#ifdef __FreeBSD__
48#include <pthread_np.h>
49#endif
50
51#include <pfring.h>
52
53struct pfring_format_data_t {
54        libtrace_list_t *per_stream;   
55        int8_t promisc;
56        int snaplen;
57        int8_t ringenabled;
58        char *bpffilter;
59};
60
61struct pfring_per_stream_t {
62
63        pfring *pd;
64        int affinity;
65
66} ALIGN_STRUCT(CACHE_LINE_SIZE);
67
68#define ZERO_PFRING_STREAM {NULL, -1}
69
70#define DATA(x) ((struct pfring_format_data_t *)x->format_data)
71#define STREAM_DATA(x) ((struct pfring_per_stream_t *)x->data)
72
73#define FORMAT_DATA DATA(libtrace)
74#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
75#define FORMAT_DATA_FIRST ((struct pfring_per_stream_t *)FORMAT_DATA_HEAD->data)
76
77
78typedef union {
79        uint32_t ipv4;
80        uint8_t ipv6[16];
81} ip_addr_union;
82
83struct tunnelinfo {
84        uint32_t id;
85        uint8_t tunneledproto;
86        ip_addr_union tunnel_src;
87        ip_addr_union tunnel_dst;
88        uint16_t tunnel_srcport;
89        uint16_t tunnel_dstport;
90};
91
92struct pktoffset {
93        int16_t ethoffset;
94        int16_t vlanoffset;
95        int16_t l3offset;
96        int16_t l4offset;
97        int16_t payloadoffset;
98};
99
100struct parsing_info {
101        uint8_t dmac[ETH_ALEN];
102        uint8_t smac[ETH_ALEN];
103        uint16_t eth_type;
104        uint16_t vlan_id;
105        uint8_t ip_version;
106        uint8_t l3_proto;
107        uint8_t ip_tos;
108        ip_addr_union ip_src;
109        ip_addr_union ip_dst;
110        uint16_t l4_src_port;
111        uint16_t l4_dst_port;
112        struct {
113                uint8_t flags;
114                uint32_t seqno;
115                uint32_t ackno;
116        } tcp;
117        struct tunnelinfo tunnel;
118        uint16_t last_matched_plugin;
119        uint16_t last_matched_rule;
120        struct pktoffset offset;
121
122};
123
124struct libtrace_pfring_extend {
125
126        uint64_t ts_ns;
127        uint32_t flags;
128        uint8_t direction;
129        int32_t if_index;
130        uint32_t hash;
131        struct {
132                int bounce_iface;
133                void *reserved;
134        } tx;
135        uint16_t parsed_hdr_len;
136        struct parsing_info parsed;
137};
138
139struct local_pfring_header {
140        struct timeval ts;
141        uint32_t caplen;
142        uint32_t wlen;
143        struct libtrace_pfring_extend ext;     
144       
145};
146
147#define PFRING_BYTEORDER_BIGENDIAN 0
148#define PFRING_BYTEORDER_LITTLEENDIAN 1
149
150#if __BYTE_ORDER == __BIG_ENDIAN
151#define PFRING_MY_BYTEORDER PFRING_BYTEORDER_BIGENDIAN
152#else
153#define PFRING_MY_BYTEORDER PFRING_BYTEORDER_LITTLEENDIAN
154#endif
155
156
157struct libtrace_pfring_header {
158        uint8_t byteorder;
159        struct {
160                uint64_t tv_sec;
161                uint64_t tv_usec;
162        } ts;
163        uint32_t caplen;
164        uint32_t wlen;
165        struct libtrace_pfring_extend ext;     
166       
167};
168
169static inline int pfring_start_input_stream(libtrace_t *libtrace,
170                struct pfring_per_stream_t *stream) {
171
172        int rc;
173
174        if (FORMAT_DATA->bpffilter) {
175                rc = pfring_set_bpf_filter(stream->pd, FORMAT_DATA->bpffilter);
176                if (rc != 0) {
177                        trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
178                                "Failed to set BPF filter on pfring:");
179                        return -1;
180                }
181        }
182
183        if ((rc = pfring_set_socket_mode(stream->pd, recv_only_mode)) != 0) {
184                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
185                                "Failed to set recv only mode on pfring:");
186                return -1;
187        }
188
189        if (pfring_enable_ring(stream->pd) != 0) {
190                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, 
191                        "Failed to enable the pfring");
192                return -1;
193        }
194       
195        return 0;
196
197}
198
199static inline uint32_t pfring_flags(libtrace_t *libtrace) {
200        uint32_t flags = PF_RING_TIMESTAMP | PF_RING_LONG_HEADER;
201        flags |= PF_RING_HW_TIMESTAMP;
202        flags |= PF_RING_DO_NOT_PARSE;
203
204        if (FORMAT_DATA->promisc > 0) 
205                flags |= PF_RING_PROMISC;
206        return flags;
207}       
208
209static int pfring_start_input(libtrace_t *libtrace) {
210        struct pfring_per_stream_t *stream = FORMAT_DATA_FIRST;
211        int rc = 0;
212
213        if (libtrace->uridata == NULL) {
214                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 
215                                "Missing interface name from pfring: URI");
216                return -1;
217        }
218        if (FORMAT_DATA->ringenabled) {
219                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
220                        "Attempted to start a pfring: input that was already started!");
221                return -1;
222        }
223
224        stream->pd = pfring_open(libtrace->uridata, FORMAT_DATA->snaplen, 
225                pfring_flags(libtrace));
226        if (stream->pd == NULL) {
227                trace_set_err(libtrace, errno, "pfring_open failed: %s",
228                                strerror(errno));
229                return -1;
230        }
231
232        rc = pfring_start_input_stream(libtrace, FORMAT_DATA_FIRST);
233        if (rc < 0)
234                return rc;     
235        FORMAT_DATA->ringenabled = 1;
236        return rc;
237}
238
239static int pfring_pstart_input(libtrace_t *libtrace) {
240        pfring *ring[MAX_NUM_RX_CHANNELS];
241        uint8_t channels;
242        struct pfring_per_stream_t empty = ZERO_PFRING_STREAM;
243        int i, iserror = 0;
244       
245        if (libtrace->uridata == NULL) {
246                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 
247                                "Missing interface name from pfring: URI");
248                return -1;
249        }
250        if (FORMAT_DATA->ringenabled) {
251                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
252                        "Attempted to start a pfring: input that was already started!");
253                return -1;
254        }
255
256        channels = pfring_open_multichannel(libtrace->uridata, 
257                        FORMAT_DATA->snaplen, pfring_flags(libtrace), ring);
258        if (channels <= 0) {
259                trace_set_err(libtrace, errno, 
260                                "pfring_open_multichannel failed: %s",
261                                strerror(errno));
262                return -1;
263        }
264
265        printf("got %u channels\n", channels);
266
267        if (libtrace->perpkt_thread_count < channels) {
268                fprintf(stderr, "WARNING: pfring interface has %u channels, "
269                                "but this libtrace program has only enough "
270                                "threads to read the first %u channels.",
271                                channels, libtrace->perpkt_thread_count);
272        }
273
274        if (channels < libtrace->perpkt_thread_count)
275                libtrace->perpkt_thread_count = channels;
276       
277
278        for (i = 0; i < channels; i++) {
279                struct pfring_per_stream_t *stream;
280                if (libtrace_list_get_size(FORMAT_DATA->per_stream)<=(size_t)i)
281                        libtrace_list_push_back(FORMAT_DATA->per_stream, &empty);
282
283                stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
284                stream->pd = ring[i];
285                if (pfring_start_input_stream(libtrace, stream) != 0) {
286                        iserror = 1;
287                        break;
288                }
289        }
290
291        if (iserror) {
292                /* Error state: free any streams we managed to create */
293                for (i = i - 1; i >= 0; i--) {
294                        struct pfring_per_stream_t *stream;
295                        stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
296
297                        pfring_disable_ring(stream->pd);       
298                        pfring_remove_bpf_filter(stream->pd);
299                        pfring_close(stream->pd);
300                }
301                return -1;
302        }
303        FORMAT_DATA->ringenabled = 1;
304        return 0;
305}
306
307
308static int pfring_init_input(libtrace_t *libtrace) {
309
310        struct pfring_per_stream_t stream_data = ZERO_PFRING_STREAM;
311
312        libtrace->format_data = (struct pfring_format_data_t *)
313                malloc(sizeof(struct pfring_format_data_t));
314        assert(libtrace->format_data != NULL);
315
316        FORMAT_DATA->promisc = -1;
317        FORMAT_DATA->snaplen = LIBTRACE_PACKET_BUFSIZE;
318        FORMAT_DATA->per_stream = libtrace_list_init(sizeof(stream_data));
319        FORMAT_DATA->ringenabled = 0;
320        FORMAT_DATA->bpffilter = NULL;
321
322        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
323
324        return 0;
325}
326
327static int pfring_config_input(libtrace_t *libtrace, trace_option_t option,
328                void *data) {
329
330        switch (option) {
331                case TRACE_OPTION_SNAPLEN:
332                        FORMAT_DATA->snaplen = *(int *)data;
333                        return 0;
334                case TRACE_OPTION_PROMISC:
335                        FORMAT_DATA->promisc = *(int *)data;
336                        return 0;
337                case TRACE_OPTION_FILTER:
338                        FORMAT_DATA->bpffilter = strdup((char *)data);
339                        return 0;
340                case TRACE_OPTION_HASHER:
341                        /* We can do unidirectional hashing on hardware
342                         * by default, but symmetric hash requires the
343                         * extra ZC or DNA drivers. */
344                        switch (*((enum hasher_types *)data)) {
345                                case HASHER_UNIDIRECTIONAL:
346                                        return 0;
347                                case HASHER_BALANCE:
348                                case HASHER_CUSTOM:
349                                case HASHER_BIDIRECTIONAL:
350                                        return -1;
351                        }
352                        break;
353                case TRACE_OPTION_META_FREQ:
354                        break;
355                case TRACE_OPTION_EVENT_REALTIME:
356                        break;
357        }
358        return -1;
359}
360
361static int pfring_pause_input(libtrace_t *libtrace) {
362        size_t i;
363
364        for (i = 0; i < libtrace_list_get_size(FORMAT_DATA->per_stream); ++i) {
365                struct pfring_per_stream_t *stream;
366                stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
367                pfring_disable_ring(stream->pd);       
368                pfring_remove_bpf_filter(stream->pd);
369                pfring_close(stream->pd);
370        }
371
372        FORMAT_DATA->ringenabled = 0;
373        return 0;
374
375}
376
377static int pfring_fin_input(libtrace_t *libtrace) {
378
379        if (libtrace->format_data) {
380                if (FORMAT_DATA->bpffilter)
381                        free(FORMAT_DATA->bpffilter);
382                if (FORMAT_DATA->per_stream) 
383                        libtrace_list_deinit(FORMAT_DATA->per_stream);
384                free(libtrace->format_data);
385        }
386        return 0;
387}
388
389
390static int pfring_get_capture_length(const libtrace_packet_t *packet) {
391        struct libtrace_pfring_header *phdr;
392        uint32_t wlen, caplen;
393        phdr = (struct libtrace_pfring_header *)packet->header;
394
395        if (packet->payload == NULL)
396                return 0;
397
398        if (phdr->byteorder != PFRING_MY_BYTEORDER) {
399                wlen = byteswap32(phdr->wlen);
400                caplen = byteswap32(phdr->caplen);
401        } else {
402                wlen = phdr->wlen;
403                caplen = phdr->caplen;
404        }
405
406        if (wlen < caplen)
407                return wlen;
408        return caplen;
409       
410}
411
412static int pfring_get_wire_length(const libtrace_packet_t *packet) {
413        struct libtrace_pfring_header *phdr;
414        phdr = (struct libtrace_pfring_header *)packet->header;
415        if (phdr->byteorder != PFRING_MY_BYTEORDER) {
416                return byteswap32(phdr->wlen);
417        }
418        return phdr->wlen;
419}
420
421static int pfring_get_framing_length(UNUSED const libtrace_packet_t *packet) {
422        return sizeof(struct libtrace_pfring_header);
423}
424
425static int pfring_prepare_packet(libtrace_t *libtrace UNUSED, 
426                libtrace_packet_t *packet, void *buffer, 
427                libtrace_rt_types_t rt_type, uint32_t flags) {
428
429
430        if (packet->buffer != buffer && packet->buf_control == 
431                        TRACE_CTRL_PACKET) {
432                free(packet->buffer);
433        }
434
435        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
436                packet->buf_control = TRACE_CTRL_PACKET;
437        } else {
438                packet->buf_control = TRACE_CTRL_EXTERNAL;
439        }
440
441        packet->type = rt_type;
442        packet->buffer = buffer;
443        packet->header = buffer;
444        packet->payload = (buffer + sizeof(struct libtrace_pfring_header));
445
446        return 0;
447}
448
449static int pfring_read_generic(libtrace_t *libtrace, libtrace_packet_t *packet,
450                struct pfring_per_stream_t *stream, uint8_t block, 
451                libtrace_message_queue_t *queue)
452{
453
454        struct libtrace_pfring_header *hdr;
455        struct local_pfring_header local;
456        int rc;
457
458        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
459                packet->buffer = malloc((size_t)LIBTRACE_PACKET_BUFSIZE);
460                if (!packet->buffer) {
461                        trace_set_err(libtrace, errno, 
462                                "Cannot allocate memory for packet buffer");
463                        return -1;
464                }
465        }
466       
467        hdr = (struct libtrace_pfring_header *)packet->buffer;
468        do {
469                if ((rc = pfring_recv(stream->pd, (u_char **)&packet->payload, 
470                        0, (struct pfring_pkthdr *)&local, 0)) == -1)
471                {
472                        trace_set_err(libtrace, errno, "Failed to read packet from pfring:");
473                        return -1;
474                }
475
476                if (rc == 0) {
477                        if (queue && libtrace_message_queue_count(queue) > 0)
478                                return READ_MESSAGE;
479                        continue;
480                }
481                break;
482        } while (block);
483
484        if (rc == 0)
485                return 0;
486
487        /* Convert the header fields to network byte order so we can
488         * export them over RT safely. Also deal with 32 vs 64 bit
489         * timevals! */
490        //hdr->ts.tv_sec = bswap_host_to_le64((uint64_t)local.ts.tv_sec);
491        //hdr->ts.tv_usec = bswap_host_to_le64((uint64_t)local.ts.tv_usec);
492#if __BYTE_ORDER == __LITTLE_ENDIAN
493        hdr->byteorder = PFRING_BYTEORDER_LITTLEENDIAN;
494#else
495        hdr->byteorder = PFRING_BYTEORDER_BIGENDIAN;
496#endif
497
498/*
499        hdr->caplen = htonl(local.caplen);
500        hdr->wlen = htonl(local.wlen);
501        hdr->ext.ts_ns = bswap_host_to_le64(local.ext.ts_ns);
502        hdr->ext.flags = htonl(local.ext.flags);
503        hdr->ext.if_index = htonl(local.ext.if_index);
504        hdr->ext.hash = htonl(local.ext.hash);
505        hdr->ext.tx.bounce_iface = htonl(local.ext.tx.bounce_iface);
506        hdr->ext.parsed_hdr_len = htons(local.ext.parsed_hdr_len);
507*/
508        hdr->caplen = (local.caplen);
509        hdr->wlen = (local.wlen);
510        hdr->ext.ts_ns = (local.ext.ts_ns);
511        hdr->ext.flags = (local.ext.flags);
512        hdr->ext.if_index = (local.ext.if_index);
513        hdr->ext.hash = (local.ext.hash);
514        hdr->ext.tx.bounce_iface = (local.ext.tx.bounce_iface);
515        hdr->ext.parsed_hdr_len = (local.ext.parsed_hdr_len);
516        hdr->ext.direction = local.ext.direction;
517
518
519        /* I think we can ignore parsed as it will only be populated if
520         * we call pfring_parse_pkt (?)
521         */
522
523        packet->trace = libtrace;
524        packet->type = TRACE_RT_DATA_PFRING;
525        packet->header = packet->buffer;
526        packet->error = 1;
527
528        return pfring_get_capture_length(packet) + 
529                        pfring_get_framing_length(packet);
530
531}
532
533static int pfring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
534{
535        return pfring_read_generic(libtrace, packet, FORMAT_DATA_FIRST, 1, NULL);
536}
537
538static libtrace_linktype_t pfring_get_link_type(const libtrace_packet_t *packet UNUSED)
539{
540        return TRACE_TYPE_ETH;
541}
542
543static libtrace_direction_t lt_pfring_set_direction(libtrace_packet_t *packet,
544                libtrace_direction_t dir) {
545
546        struct libtrace_pfring_header *phdr;
547
548        phdr = (struct libtrace_pfring_header *)packet->header;
549        phdr->ext.direction = dir;
550        return dir;     
551}
552
553static libtrace_direction_t pfring_get_direction(
554                const libtrace_packet_t *packet) {
555
556        struct libtrace_pfring_header *phdr;
557        phdr = (struct libtrace_pfring_header *)packet->header;
558        return phdr->ext.direction;
559}
560
561static uint64_t pfring_get_erf_timestamp(const libtrace_packet_t *packet) {
562        uint64_t ts;
563        struct libtrace_pfring_header *phdr;
564        phdr = (struct libtrace_pfring_header *)packet->header;
565
566        if (phdr->ext.ts_ns) {
567                uint64_t tns;
568                if (phdr->byteorder == PFRING_MY_BYTEORDER)
569                        tns = phdr->ext.ts_ns;
570                else
571                        tns = byteswap64(phdr->ext.ts_ns);
572
573                ts = ((tns / 1000000000) << 32);
574                ts += ((tns % 1000000000) << 32) / 1000000000;
575        } else {
576                uint64_t sec, usec;
577                if (phdr->byteorder == PFRING_MY_BYTEORDER) {
578                        sec = (uint64_t)(phdr->ts.tv_sec);
579                        usec = (uint64_t)(phdr->ts.tv_usec);
580                } else {
581                        sec = (uint64_t)byteswap32(phdr->ts.tv_sec);
582                        usec = (uint64_t)byteswap32(phdr->ts.tv_usec);
583                }
584
585                ts = (sec << 32);
586                ts += ((usec << 32)/1000000);
587        }
588        return ts;
589               
590
591}
592static size_t pfring_set_capture_length(libtrace_packet_t *packet, size_t size)
593{
594        struct libtrace_pfring_header *phdr;
595        phdr = (struct libtrace_pfring_header *)packet->header;
596
597        if (size > trace_get_capture_length(packet)) {
598                /* Can't make a packet larger */
599                return trace_get_capture_length(packet);
600        }
601
602        packet->capture_length = -1;
603        if (phdr->byteorder != PFRING_MY_BYTEORDER) {
604                phdr->caplen = byteswap32(size);
605        } else {
606                phdr->caplen = size;
607        }
608        return trace_get_capture_length(packet);
609}
610
611static void pfring_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
612
613        pfring_stat st;
614
615        size_t i;
616
617        for (i = 0; i < libtrace_list_get_size(FORMAT_DATA->per_stream); ++i) {
618                struct pfring_per_stream_t *stream;
619                stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
620
621                if (pfring_stats(stream->pd, &st) != 0) {
622                        trace_set_err(libtrace, errno, "Failed to get statistics for pfring stream %u", (uint32_t)i);
623                        continue;
624                }
625
626                if (stat->dropped_valid) {
627                        stat->dropped += st.drop;
628                } else {
629                        stat->dropped = st.drop;
630                        stat->dropped_valid = 1;
631                }
632
633                if (stat->received_valid) {
634                        stat->received += st.recv;
635                } else {
636                        stat->received = st.recv;
637                        stat->received_valid = 1;
638                }
639        }
640
641}
642
643static libtrace_eventobj_t pfring_event(libtrace_t *libtrace, 
644                libtrace_packet_t *packet) {
645
646        libtrace_eventobj_t event = {0,0,0.0,0};
647        int rc;
648
649        rc = pfring_read_generic(libtrace, packet, FORMAT_DATA_FIRST, 0, NULL);
650       
651        if (rc > 0) {
652                event.size = rc;
653                event.type = TRACE_EVENT_PACKET;
654        } else if (rc == 0) {
655                if (libtrace_halt) {
656                        event.type = TRACE_EVENT_TERMINATE;
657                } else {
658                        event.type = TRACE_EVENT_IOWAIT;
659                        event.fd = pfring_get_selectable_fd(FORMAT_DATA_FIRST->pd);
660                }
661        } else {
662                event.type = TRACE_EVENT_TERMINATE;
663        }
664        return event;
665}
666
667static int pfring_pread_packets(libtrace_t *libtrace,
668                libtrace_thread_t *t, 
669                libtrace_packet_t *packets[],
670                size_t nb_packets) {
671
672        size_t readpackets = 0;
673        int rc = 0;
674        struct pfring_per_stream_t *stream = (struct pfring_per_stream_t *)t->format_data;
675        uint8_t block = 1;
676
677        /* Block for the first packet, then read up to nb_packets if they
678         * are available. */
679        do {
680                rc = pfring_read_generic(libtrace, packets[readpackets], 
681                        stream, block, &t->messages);
682                if (rc == READ_MESSAGE) {
683                        if (readpackets == 0) {
684                                return rc;
685                        }
686                        break;
687                }
688                               
689                if (rc == READ_ERROR)
690                        return rc;
691
692                if (rc == 0)
693                        continue;
694               
695                block = 0;
696                readpackets ++;
697                if (readpackets >= nb_packets)
698                        break;
699
700        } while (rc != 0);
701
702        return readpackets;
703}
704
705static int pfring_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
706                bool reading) {
707
708        uint32_t cpus = trace_get_number_of_cores();
709
710        if (reading) {
711                struct pfring_per_stream_t *stream;
712                int tid = 0;
713                if (t->type == THREAD_PERPKT) {
714                        t->format_data = libtrace_list_get_index(FORMAT_DATA->per_stream, t->perpkt_num)->data;
715                        if (t->format_data == NULL) {
716                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
717                                                "Too many threads registered");
718                                return -1;
719                        }
720                        tid = t->perpkt_num;
721                } else {
722                        t->format_data = FORMAT_DATA_FIRST;
723                }
724
725                stream = t->format_data;
726                if (cpus > 1) {
727                        cpu_set_t cpuset;
728                        uint32_t coreid;
729                        int s;
730
731                        coreid = (tid + 1) % cpus;
732                        CPU_ZERO(&cpuset);
733                        CPU_SET(coreid, &cpuset);
734                        if ((s = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset)) != 0) {
735                                trace_set_err(libtrace, errno, "Warning "
736                                                "failed to set affinity for "
737                                                "pfring thread");
738                                return -1;
739                        }
740                        stream->affinity = coreid;
741                }
742        }
743
744        return 0;               
745
746}
747
748static struct libtrace_format_t pfringformat = {
749        "pfring",
750        "$Id$",
751        TRACE_FORMAT_PFRING,
752        NULL,                           /* probe filename */
753        NULL,                           /* probe magic */
754        pfring_init_input,                /* init_input */
755        pfring_config_input,              /* config_input */
756        pfring_start_input,               /* start_input */
757        pfring_pause_input,               /* pause_input */
758        NULL,                           /* init_output */
759        NULL,                           /* config_output */
760        NULL,                           /* start_output */
761        pfring_fin_input,                 /* fin_input */
762        NULL,                           /* fin_output */
763        pfring_read_packet,               /* read_packet */
764        pfring_prepare_packet,            /* prepare_packet */
765        NULL,                           /* fin_packet */
766        NULL,                             /* write_packet */
767        pfring_get_link_type,             /* get_link_type */
768        pfring_get_direction,             /* get_direction */
769        lt_pfring_set_direction,             /* set_direction */
770        pfring_get_erf_timestamp,         /* get_erf_timestamp */
771        NULL,               /* get_timeval */
772        NULL,                           /* get_seconds */
773        NULL,                           /* get_timespec */
774        NULL,                           /* seek_erf */
775        NULL,                           /* seek_timeval */
776        NULL,                           /* seek_seconds */
777        pfring_get_capture_length,        /* get_capture_length */
778        pfring_get_wire_length,           /* get_wire_length */
779        pfring_get_framing_length,        /* get_framing_length */
780        pfring_set_capture_length,        /* set_capture_length */
781        NULL,                           /* get_received_packets */
782        NULL,                           /* get_filtered_packets */
783        NULL,                           /* get_dropped_packets */
784        pfring_get_statistics,          /* get_statistics */
785        NULL,                           /* get_fd */
786        pfring_event,              /* trace_event */
787        NULL,                      /* help */
788        NULL,                   /* next pointer */
789        {true, MAX_NUM_RX_CHANNELS},         /* Live, with thread limit */
790        pfring_pstart_input,         /* pstart_input */
791        pfring_pread_packets,        /* pread_packets */
792        pfring_pause_input,        /* ppause */
793        pfring_fin_input,          /* p_fin */
794        pfring_pregister_thread,        /* register thread */ 
795        NULL,                           /* unregister thread */
796        NULL                            /* get thread stats */
797
798};
799
800void pfring_constructor(void) {
801        register_format(&pfringformat);
802}
Note: See TracBrowser for help on using the repository browser.