source: lib/format_pfring.c @ a21b45e

pfring
Last change on this file since a21b45e was a21b45e, checked in by Shane Alcock <salcock@…>, 7 years ago

Started on pfring_zc format

Doesn't compile yet, but needed to commit this so I could work on
some other things without worrying about losing this work.

  • Property mode set to 100644
File size: 26.1 KB
Line 
1/*
2 * This file is part of libtrace
3 *
4 * Copyright (c) 2007-2015 The University of Waikato, Hamilton,
5 * New Zealand.
6 *
7 * Author: Shane Alcock
8 *
9 * All rights reserved.
10 *
11 * This code has been developed by the University of Waikato WAND
12 * research group. For further information please see http://www.wand.net.nz/
13 *
14 * libtrace is free software; you can redistribute it and/or modify
15 * it under the terms of the GNU General Public License as published by
16 * the Free Software Foundation; either version 2 of the License, or
17 * (at your option) any later version.
18 *
19 * libtrace is distributed in the hope that it will be useful,
20 * but WITHOUT ANY WARRANTY; without even the implied warranty of
21 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22 * GNU General Public License for more details.
23 *
24 * You should have received a copy of the GNU General Public License
25 * along with libtrace; if not, write to the Free Software
26 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
27 *
28 */
29
30#define _GNU_SOURCE
31#include "config.h"
32#include "libtrace.h"
33#include "libtrace_int.h"
34#include "format_helper.h"
35#include "data-struct/linked_list.h"
36
37#include <stdlib.h>
38#include <assert.h>
39#include <unistd.h>
40#include <string.h>
41
42#if HAVE_LIBNUMA
43#include <numa.h>
44#endif
45
46#include <pthread.h>
47#ifdef __FreeBSD__
48#include <pthread_np.h>
49#endif
50
51#include <pfring.h>
52#include <pfring_zc.h>
53
54struct pfring_format_data_t {
55        libtrace_list_t *per_stream;   
56        int8_t promisc;
57        int snaplen;
58        int8_t ringenabled;
59        char *bpffilter;
60};
61
62struct pfringzc_per_thread {
63
64        uint32_t lastbatch;
65        uint32_t nextpacket;
66        pfring_zc_pkt_buff ** buffers;
67}
68
69
70struct pfringzc_format_data_t {
71        pfring_zc_cluster *cluster;
72        pfring_zc_worker *hasher;
73        pfring_zc_buffer_pool *pool;
74
75        pfring_zc_queue **inqueues;
76        pfring_zc_queue **outqueues;
77        uint16_t clusterid;     
78        int numthreads;
79
80        struct pfringzc_per_thread *perthreads;
81
82        int8_t promisc;
83        int snaplen;
84        char *bpffilter;
85        enum hasher_types hashtype;
86
87};
88
89struct pfring_per_stream_t {
90
91        pfring *pd;
92        int affinity;
93
94} ALIGN_STRUCT(CACHE_LINE_SIZE);
95
96#define ZERO_PFRING_STREAM {NULL, -1}
97
98#define DATA(x) ((struct pfring_format_data_t *)x->format_data)
99#define ZCDATA(x) ((struct pfringzc_format_data_t *)x->format_data)
100#define STREAM_DATA(x) ((struct pfring_per_stream_t *)x->data)
101
102#define FORMAT_DATA DATA(libtrace)
103#define ZC_FORMAT_DATA ZCDATA(libtrace)
104#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
105#define FORMAT_DATA_FIRST ((struct pfring_per_stream_t *)FORMAT_DATA_HEAD->data)
106
107#define PFRINGZC_BATCHSIZE 10
108
109typedef union {
110        uint32_t ipv4;
111        uint8_t ipv6[16];
112} ip_addr_union;
113
114struct tunnelinfo {
115        uint32_t id;
116        uint8_t tunneledproto;
117        ip_addr_union tunnel_src;
118        ip_addr_union tunnel_dst;
119        uint16_t tunnel_srcport;
120        uint16_t tunnel_dstport;
121};
122
123struct pktoffset {
124        int16_t ethoffset;
125        int16_t vlanoffset;
126        int16_t l3offset;
127        int16_t l4offset;
128        int16_t payloadoffset;
129};
130
131struct parsing_info {
132        uint8_t dmac[ETH_ALEN];
133        uint8_t smac[ETH_ALEN];
134        uint16_t eth_type;
135        uint16_t vlan_id;
136        uint8_t ip_version;
137        uint8_t l3_proto;
138        uint8_t ip_tos;
139        ip_addr_union ip_src;
140        ip_addr_union ip_dst;
141        uint16_t l4_src_port;
142        uint16_t l4_dst_port;
143        struct {
144                uint8_t flags;
145                uint32_t seqno;
146                uint32_t ackno;
147        } tcp;
148        struct tunnelinfo tunnel;
149        uint16_t last_matched_plugin;
150        uint16_t last_matched_rule;
151        struct pktoffset offset;
152
153};
154
155struct libtrace_pfring_extend {
156
157        uint64_t ts_ns;
158        uint32_t flags;
159        uint8_t direction;
160        int32_t if_index;
161        uint32_t hash;
162        struct {
163                int bounce_iface;
164                void *reserved;
165        } tx;
166        uint16_t parsed_hdr_len;
167        struct parsing_info parsed;
168};
169
170struct local_pfring_header {
171        struct timeval ts;
172        uint32_t caplen;
173        uint32_t wlen;
174        struct libtrace_pfring_extend ext;     
175       
176};
177
178#define PFRING_BYTEORDER_BIGENDIAN 0
179#define PFRING_BYTEORDER_LITTLEENDIAN 1
180
181#if __BYTE_ORDER == __BIG_ENDIAN
182#define PFRING_MY_BYTEORDER PFRING_BYTEORDER_BIGENDIAN
183#else
184#define PFRING_MY_BYTEORDER PFRING_BYTEORDER_LITTLEENDIAN
185#endif
186
187
188struct libtrace_pfring_header {
189        uint8_t byteorder;
190        struct {
191                uint64_t tv_sec;
192                uint64_t tv_usec;
193        } ts;
194        uint32_t caplen;
195        uint32_t wlen;
196        struct libtrace_pfring_extend ext;     
197       
198};
199
200static inline int pfring_start_input_stream(libtrace_t *libtrace,
201                struct pfring_per_stream_t *stream) {
202
203        int rc;
204
205        if (FORMAT_DATA->bpffilter) {
206                rc = pfring_set_bpf_filter(stream->pd, FORMAT_DATA->bpffilter);
207                if (rc != 0) {
208                        trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
209                                "Failed to set BPF filter on pfring:");
210                        return -1;
211                }
212        }
213
214        if ((rc = pfring_set_socket_mode(stream->pd, recv_only_mode)) != 0) {
215                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
216                                "Failed to set recv only mode on pfring:");
217                return -1;
218        }
219
220        if (pfring_enable_ring(stream->pd) != 0) {
221                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, 
222                        "Failed to enable the pfring");
223                return -1;
224        }
225       
226        return 0;
227
228}
229
230static inline uint32_t pfring_flags(libtrace_t *libtrace) {
231        uint32_t flags = PF_RING_TIMESTAMP | PF_RING_LONG_HEADER;
232        flags |= PF_RING_HW_TIMESTAMP;
233        flags |= PF_RING_DO_NOT_PARSE;
234
235        if (FORMAT_DATA->promisc > 0) 
236                flags |= PF_RING_PROMISC;
237        return flags;
238}       
239
240static inline int pfringzc_init_queues(libtrace_t *libtrace, 
241                struct pfringzc_format_data_t *fdata, int threads) {
242
243        int i, j;
244        char devname[4096];
245
246        fdata->inqueues = calloc(threads, sizeof(pfring_zc_queue *));
247        fdata->outqueues = calloc(threads, sizeof(pfring_zc_queue *));
248        fdata->perthreads = calloc(threads, sizeof(struct pfringzc_per_thread));
249
250        for (i = 0; i < threads; i++) {
251                snprintf(devname, 4095, "zc:%s@%d", libtrace->uridata, i);
252               
253                fdata->perthreads[i]->buffers = calloc(PFRINGZC_BATCHSIZE, sizeof(pfring_zc_pkt_buff *));
254                fdata->perthreads[i]->lastbatch = 0;
255                fdata->perthreads[i]->nextpacket = 0;
256
257                for (j = 0; j < PFRINGZC_BATCHSIZE; j++) {
258                        fdata->perthreads[i]->buffers[j] = pfring_zc_get_packet_handle(fdata->cluster);
259               
260                        if (fdata->perthreads[i]->buffers[j] == NULL) {
261                                trace_set_err(libtrace, errno, "Failed to create pfringzc packet handle");
262                                goto error;
263                        }
264                }
265               
266                fdata->inqueues[i] = pfring_zc_open_device(fdata->cluster,
267                                devname, rx_only, 0);
268                if (data->inqueues[i] == NULL) {
269                        trace_set_err(libtrace, errno, "Failed to create pfringzc in queue");
270                        goto error;
271                }
272
273
274                fdata->outqueues[i] = pfring_zc_create_queue(fdata->cluster,
275                                8192);
276                if (data->outqueues[i] == NULL) {
277                        trace_set_err(libtrace, errno, "Failed to create pfringzc out queue");
278                        goto error;
279                }
280
281        }
282
283        fdata->pool = pfring_zc_create_buffer_pool(fdata->cluster, 8);
284        if (fdata->pool == NULL) {
285                trace_set_err(libtrace, errno, "Failed to create pfringzc buffer pool");
286                goto error;
287        }
288
289        return 0;
290
291error:
292        //pfringzc_destroy_queues(libtrace, fdata, threads);
293        return -1;
294
295}
296
297static int pfringzc_start_input(libtrace_t *libtrace) {
298
299        if (ZC_FORMAT_DATA->cluster != NULL) {
300                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
301                        "Attempted to start a pfringzc: input that was already started!");
302                return -1;
303        }
304
305        if (libtrace->uridata == NULL) {
306                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 
307                                "Missing interface name from pfringzc: URI");
308                return -1;
309        }
310
311        ZC_FORMAT_DATA->cluster = pfring_zc_create_cluster(
312                        ZC_FORMAT_DATA->clusterid,
313                        1600,   /* TODO calculate */
314                        0,      /* meta-data length */
315                        8192 * 32687 + PFRINGZC_BATCHSIZE,  /* number of buffers */
316                        pfring_zc_numa_get_cpu_node(0), /* bind to core 0 */
317                        NULL    /* auto hugetlb mountpoint */
318                        );
319        if (ZC_FORMAT_DATA->cluster == NULL) {
320                trace_set_err(libtrace, errno, "Failed to create pfringzc cluster");
321                return -1;
322        }
323
324        if (pfringzc_init_queues(libtrace, ZC_FORMAT_DATA, 1) == -1)
325                return -1;
326
327        /* No hasher necessary, as we just have one thread */
328        ZC_FORMAT_DATA->hasher = pfring_zc_run_balancer(
329                ZC_FORMAT_DATA->inqueues, ZC_FORMAT_DATA->outqueues, 1, 1,
330                ZC_FORMAT_DATA->pool, round_robin_bursts_policy, NULL,
331                NULL, NULL, 1, 0);
332        return 0;
333}
334
335static int pfring_start_input(libtrace_t *libtrace) {
336        struct pfring_per_stream_t *stream = FORMAT_DATA_FIRST;
337        int rc = 0;
338
339        if (libtrace->uridata == NULL) {
340                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 
341                                "Missing interface name from pfring: URI");
342                return -1;
343        }
344        if (FORMAT_DATA->ringenabled) {
345                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
346                        "Attempted to start a pfring: input that was already started!");
347                return -1;
348        }
349
350        stream->pd = pfring_open(libtrace->uridata, FORMAT_DATA->snaplen, 
351                pfring_flags(libtrace));
352        if (stream->pd == NULL) {
353                trace_set_err(libtrace, errno, "pfring_open failed: %s",
354                                strerror(errno));
355                return -1;
356        }
357
358        rc = pfring_start_input_stream(libtrace, FORMAT_DATA_FIRST);
359        if (rc < 0)
360                return rc;     
361        FORMAT_DATA->ringenabled = 1;
362        return rc;
363}
364
365static int pfring_pstart_input(libtrace_t *libtrace) {
366        pfring *ring[MAX_NUM_RX_CHANNELS];
367        uint8_t channels;
368        struct pfring_per_stream_t empty = ZERO_PFRING_STREAM;
369        int i, iserror = 0;
370       
371        if (libtrace->uridata == NULL) {
372                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT, 
373                                "Missing interface name from pfring: URI");
374                return -1;
375        }
376        if (FORMAT_DATA->ringenabled) {
377                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
378                        "Attempted to start a pfring: input that was already started!");
379                return -1;
380        }
381
382        channels = pfring_open_multichannel(libtrace->uridata, 
383                        FORMAT_DATA->snaplen, pfring_flags(libtrace), ring);
384        if (channels <= 0) {
385                trace_set_err(libtrace, errno, 
386                                "pfring_open_multichannel failed: %s",
387                                strerror(errno));
388                return -1;
389        }
390
391        printf("got %u channels\n", channels);
392
393        if (libtrace->perpkt_thread_count < channels) {
394                fprintf(stderr, "WARNING: pfring interface has %u channels, "
395                                "but this libtrace program has only enough "
396                                "threads to read the first %u channels.",
397                                channels, libtrace->perpkt_thread_count);
398        }
399
400        if (channels < libtrace->perpkt_thread_count)
401                libtrace->perpkt_thread_count = channels;
402       
403
404        for (i = 0; i < channels; i++) {
405                struct pfring_per_stream_t *stream;
406                if (libtrace_list_get_size(FORMAT_DATA->per_stream)<=(size_t)i)
407                        libtrace_list_push_back(FORMAT_DATA->per_stream, &empty);
408
409                stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
410                stream->pd = ring[i];
411                if (pfring_start_input_stream(libtrace, stream) != 0) {
412                        iserror = 1;
413                        break;
414                }
415        }
416
417        if (iserror) {
418                /* Error state: free any streams we managed to create */
419                for (i = i - 1; i >= 0; i--) {
420                        struct pfring_per_stream_t *stream;
421                        stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
422
423                        pfring_disable_ring(stream->pd);       
424                        pfring_remove_bpf_filter(stream->pd);
425                        pfring_close(stream->pd);
426                }
427                return -1;
428        }
429        FORMAT_DATA->ringenabled = 1;
430        return 0;
431}
432
433
434static int pfring_init_input(libtrace_t *libtrace) {
435
436        struct pfring_per_stream_t stream_data = ZERO_PFRING_STREAM;
437
438        libtrace->format_data = (struct pfring_format_data_t *)
439                malloc(sizeof(struct pfring_format_data_t));
440        assert(libtrace->format_data != NULL);
441
442        FORMAT_DATA->promisc = -1;
443        FORMAT_DATA->snaplen = LIBTRACE_PACKET_BUFSIZE;
444        FORMAT_DATA->per_stream = libtrace_list_init(sizeof(stream_data));
445        FORMAT_DATA->ringenabled = 0;
446        FORMAT_DATA->bpffilter = NULL;
447
448        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
449
450        return 0;
451}
452
453static int pfringzc_init_input(libtrace_t *libtrace) {
454
455        libtrace->format_data = (struct pfringzc_format_data_t *)
456                malloc(sizeof(struct pfringzc_format_data_t));
457        assert(libtrace->format_data != NULL);
458       
459        ZC_FORMAT_DATA->promisc = -1;
460        ZC_FORMAT_DATA->snaplen = LIBTRACE_PACKET_BUFSIZE;
461        ZC_FORMAT_DATA->bpffilter = NULL;
462
463        ZC_FORMAT_DATA->cluster = NULL;
464        ZC_FORMAT_DATA->inqueue = NULL;
465        ZC_FORMAT_DATA->outqueues = NULL;
466        ZC_FORMAT_DATA->buffers = NULL;
467        ZC_FORMAT_DATA->pool = NULL;
468        ZC_FORMAT_DATA->hasher = NULL;
469        ZC_FORMAT_DATA->hashtype = HASHER_BIDIRECTIONAL;
470        ZC_FORMAT_DATA->clusterid = (uint16_t)rand();
471
472        return 0;
473}
474
475static int pfringzc_config_input(libtrace_t *libtrace, trace_option_t option,
476                void *data) {
477
478        switch (option) {
479                case TRACE_OPTION_SNAPLEN:
480                        ZC_FORMAT_DATA->snaplen = *(int *)data;
481                        return 0;
482                case TRACE_OPTION_PROMISC:
483                        ZC_FORMAT_DATA->promisc = *(int *)data;
484                        return 0;
485                case TRACE_OPTION_FILTER:
486                        ZC_FORMAT_DATA->bpffilter = strdup((char *)data);
487                        return 0;
488                case TRACE_OPTION_HASHER:
489                        /* We can do bidirectional hashing on hardware
490                         * by default, thanks to the ZC library */
491                        ZC_FORMAT_DATA->hashtype = *((enum hasher_types *)data);
492                        switch (*((enum hasher_types *)data)) {
493                                case HASHER_BIDIRECTIONAL:
494                                case HASHER_UNIDIRECTIONAL:
495                                        return 0;
496                                case HASHER_BALANCE:
497                                        return 0;               
498                                case HASHER_CUSTOM:
499                                        return -1;
500                        }
501                        break;
502                case TRACE_OPTION_META_FREQ:
503                        break;
504                case TRACE_OPTION_EVENT_REALTIME:
505                        break;
506        }
507        return -1;
508}
509
510static int pfring_config_input(libtrace_t *libtrace, trace_option_t option,
511                void *data) {
512
513        switch (option) {
514                case TRACE_OPTION_SNAPLEN:
515                        FORMAT_DATA->snaplen = *(int *)data;
516                        return 0;
517                case TRACE_OPTION_PROMISC:
518                        FORMAT_DATA->promisc = *(int *)data;
519                        return 0;
520                case TRACE_OPTION_FILTER:
521                        FORMAT_DATA->bpffilter = strdup((char *)data);
522                        return 0;
523                case TRACE_OPTION_HASHER:
524                        /* We can do unidirectional hashing on hardware
525                         * by default, but symmetric hash requires the
526                         * extra ZC or DNA drivers. */
527                        switch (*((enum hasher_types *)data)) {
528                                case HASHER_UNIDIRECTIONAL:
529                                        return 0;
530                                case HASHER_BALANCE:
531                                case HASHER_CUSTOM:
532                                case HASHER_BIDIRECTIONAL:
533                                        return -1;
534                        }
535                        break;
536                case TRACE_OPTION_META_FREQ:
537                        break;
538                case TRACE_OPTION_EVENT_REALTIME:
539                        break;
540        }
541        return -1;
542}
543
544static int pfring_pause_input(libtrace_t *libtrace) {
545        size_t i;
546
547        for (i = 0; i < libtrace_list_get_size(FORMAT_DATA->per_stream); ++i) {
548                struct pfring_per_stream_t *stream;
549                stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
550                pfring_disable_ring(stream->pd);       
551                pfring_remove_bpf_filter(stream->pd);
552                pfring_close(stream->pd);
553        }
554
555        FORMAT_DATA->ringenabled = 0;
556        return 0;
557
558}
559
560static int pfringzc_pause_input(libtrace_t *libtrace) {
561
562        /* hopefully this will clean up our buffers and queues? */
563        pfring_zc_kill_worker(ZC_FORMAT_DATA->hasher);
564        pfring_zc_destroy_cluster(ZC_FORMAT_DATA->cluster);
565        return 0;
566}
567
568static int pfring_fin_input(libtrace_t *libtrace) {
569
570        if (libtrace->format_data) {
571                if (FORMAT_DATA->bpffilter)
572                        free(FORMAT_DATA->bpffilter);
573                if (FORMAT_DATA->per_stream) 
574                        libtrace_list_deinit(FORMAT_DATA->per_stream);
575                free(libtrace->format_data);
576        }
577        return 0;
578}
579
580
581static int pfringzc_fin_input(libtrace_t *input) {
582        if (libtrace->format_data) {
583                if (ZC_FORMAT_DATA->bpffilter)
584                        free(ZC_FORMAT_DATA->bpffilter);
585                free(libtrace->format_data);
586        }
587        return 0;
588
589}
590
591static int pfring_get_capture_length(const libtrace_packet_t *packet) {
592        struct libtrace_pfring_header *phdr;
593        uint32_t wlen, caplen;
594        phdr = (struct libtrace_pfring_header *)packet->header;
595
596        if (packet->payload == NULL)
597                return 0;
598
599        if (phdr->byteorder != PFRING_MY_BYTEORDER) {
600                wlen = byteswap32(phdr->wlen);
601                caplen = byteswap32(phdr->caplen);
602        } else {
603                wlen = phdr->wlen;
604                caplen = phdr->caplen;
605        }
606
607        if (wlen < caplen)
608                return wlen;
609        return caplen;
610       
611}
612
613static int pfring_get_wire_length(const libtrace_packet_t *packet) {
614        struct libtrace_pfring_header *phdr;
615        phdr = (struct libtrace_pfring_header *)packet->header;
616        if (phdr->byteorder != PFRING_MY_BYTEORDER) {
617                return byteswap32(phdr->wlen);
618        }
619        return phdr->wlen;
620}
621
622static int pfring_get_framing_length(UNUSED const libtrace_packet_t *packet) {
623        return sizeof(struct libtrace_pfring_header);
624}
625
626static int pfring_prepare_packet(libtrace_t *libtrace UNUSED, 
627                libtrace_packet_t *packet, void *buffer, 
628                libtrace_rt_types_t rt_type, uint32_t flags) {
629
630
631        if (packet->buffer != buffer && packet->buf_control == 
632                        TRACE_CTRL_PACKET) {
633                free(packet->buffer);
634        }
635
636        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
637                packet->buf_control = TRACE_CTRL_PACKET;
638        } else {
639                packet->buf_control = TRACE_CTRL_EXTERNAL;
640        }
641
642        packet->type = rt_type;
643        packet->buffer = buffer;
644        packet->header = buffer;
645        packet->payload = (buffer + sizeof(struct libtrace_pfring_header));
646
647        return 0;
648}
649
650static int pfringzc_read_batch(libtrace_t *libtrace, int oq, uint8_t block,
651                libtrace_message_queue_t *queue) {
652
653        int received;
654
655        do {
656                received = pfring_zc_recv_pkt_burst(
657                                ZC_FORMAT_DATA->outqueues[oq], 
658                                ZC_FORMAT_DATA->buffers[oq],
659                                PFRINGZC_BATCHSIZE,
660                                0);
661               
662                if (received < 0) {
663                        trace_set_err(libtrace, errno, "Failed to read packet batch from pfringzc:");
664                        return -1;
665                }
666
667                if (received == 0) {
668                        if (queue && libtrace_message_queue_count(queue) > 0)
669                                return READ_MESSAGE;
670                        continue;
671                }
672
673                ZC_FORMAT_DATA->lastbatch[oq] = received;
674                ZC_FORMAT_DATA->nextpacket[oq] = 0;             
675
676        } while (block);
677        return 0;
678}
679
680static int pfring_read_generic(libtrace_t *libtrace, libtrace_packet_t *packet,
681                struct pfring_per_stream_t *stream, uint8_t block, 
682                libtrace_message_queue_t *queue)
683{
684
685        struct libtrace_pfring_header *hdr;
686        struct local_pfring_header local;
687        int rc;
688
689        if (packet->buf_control == TRACE_CTRL_EXTERNAL || !packet->buffer) {
690                packet->buffer = malloc((size_t)LIBTRACE_PACKET_BUFSIZE);
691                if (!packet->buffer) {
692                        trace_set_err(libtrace, errno, 
693                                "Cannot allocate memory for packet buffer");
694                        return -1;
695                }
696        }
697       
698        hdr = (struct libtrace_pfring_header *)packet->buffer;
699        do {
700                if ((rc = pfring_recv(stream->pd, (u_char **)&packet->payload, 
701                        0, (struct pfring_pkthdr *)&local, 0)) == -1)
702                {
703                        trace_set_err(libtrace, errno, "Failed to read packet from pfring:");
704                        return -1;
705                }
706
707                if (rc == 0) {
708                        if (queue && libtrace_message_queue_count(queue) > 0)
709                                return READ_MESSAGE;
710                        continue;
711                }
712                break;
713        } while (block);
714
715        if (rc == 0)
716                return 0;
717
718#if __BYTE_ORDER == __LITTLE_ENDIAN
719        hdr->byteorder = PFRING_BYTEORDER_LITTLEENDIAN;
720#else
721        hdr->byteorder = PFRING_BYTEORDER_BIGENDIAN;
722#endif
723
724        hdr->caplen = (local.caplen);
725        hdr->wlen = (local.wlen);
726        hdr->ext.ts_ns = (local.ext.ts_ns);
727        hdr->ext.flags = (local.ext.flags);
728        hdr->ext.if_index = (local.ext.if_index);
729        hdr->ext.hash = (local.ext.hash);
730        hdr->ext.tx.bounce_iface = (local.ext.tx.bounce_iface);
731        hdr->ext.parsed_hdr_len = (local.ext.parsed_hdr_len);
732        hdr->ext.direction = local.ext.direction;
733
734
735        /* I think we can ignore parsed as it will only be populated if
736         * we call pfring_parse_pkt (?)
737         */
738
739        packet->trace = libtrace;
740        packet->type = TRACE_RT_DATA_PFRING;
741        packet->header = packet->buffer;
742        packet->error = 1;
743
744        return pfring_get_capture_length(packet) + 
745                        pfring_get_framing_length(packet);
746
747}
748
749static int pfringzc_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
750{
751
752        struct pfringzc_per_thread *pzt = ZC_FORMAT_DATA->perthreads[0];
753
754        if (pzt->nextpacket >= pzt->lastbatch) {
755                /* Read a fresh batch of packets */
756                if (pfringzc_read_batch(libtrace, 0, 1, NULL) < 0) {
757                        return -1;
758                }
759               
760        }
761
762        pfring_zc_pkt_buff *pbuf = pzt->buffers[pzt->nextpacket];
763        pzt->nextpacket ++;
764
765       
766
767}
768
769static int pfring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
770{
771        return pfring_read_generic(libtrace, packet, FORMAT_DATA_FIRST, 1, NULL);
772}
773
774static libtrace_linktype_t pfring_get_link_type(const libtrace_packet_t *packet UNUSED)
775{
776        return TRACE_TYPE_ETH;
777}
778
779static libtrace_direction_t lt_pfring_set_direction(libtrace_packet_t *packet,
780                libtrace_direction_t dir) {
781
782        struct libtrace_pfring_header *phdr;
783
784        phdr = (struct libtrace_pfring_header *)packet->header;
785        phdr->ext.direction = dir;
786        return dir;     
787}
788
789static libtrace_direction_t pfring_get_direction(
790                const libtrace_packet_t *packet) {
791
792        struct libtrace_pfring_header *phdr;
793        phdr = (struct libtrace_pfring_header *)packet->header;
794        return phdr->ext.direction;
795}
796
797static uint64_t pfring_get_erf_timestamp(const libtrace_packet_t *packet) {
798        uint64_t ts;
799        struct libtrace_pfring_header *phdr;
800        phdr = (struct libtrace_pfring_header *)packet->header;
801
802        if (phdr->ext.ts_ns) {
803                uint64_t tns;
804                if (phdr->byteorder == PFRING_MY_BYTEORDER)
805                        tns = phdr->ext.ts_ns;
806                else
807                        tns = byteswap64(phdr->ext.ts_ns);
808
809                ts = ((tns / 1000000000) << 32);
810                ts += ((tns % 1000000000) << 32) / 1000000000;
811        } else {
812                uint64_t sec, usec;
813                if (phdr->byteorder == PFRING_MY_BYTEORDER) {
814                        sec = (uint64_t)(phdr->ts.tv_sec);
815                        usec = (uint64_t)(phdr->ts.tv_usec);
816                } else {
817                        sec = (uint64_t)byteswap32(phdr->ts.tv_sec);
818                        usec = (uint64_t)byteswap32(phdr->ts.tv_usec);
819                }
820
821                ts = (sec << 32);
822                ts += ((usec << 32)/1000000);
823        }
824        return ts;
825               
826
827}
828static size_t pfring_set_capture_length(libtrace_packet_t *packet, size_t size)
829{
830        struct libtrace_pfring_header *phdr;
831        phdr = (struct libtrace_pfring_header *)packet->header;
832
833        if (size > trace_get_capture_length(packet)) {
834                /* Can't make a packet larger */
835                return trace_get_capture_length(packet);
836        }
837
838        packet->capture_length = -1;
839        if (phdr->byteorder != PFRING_MY_BYTEORDER) {
840                phdr->caplen = byteswap32(size);
841        } else {
842                phdr->caplen = size;
843        }
844        return trace_get_capture_length(packet);
845}
846
847static void pfring_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
848
849        pfring_stat st;
850
851        size_t i;
852
853        for (i = 0; i < libtrace_list_get_size(FORMAT_DATA->per_stream); ++i) {
854                struct pfring_per_stream_t *stream;
855                stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
856
857                if (pfring_stats(stream->pd, &st) != 0) {
858                        trace_set_err(libtrace, errno, "Failed to get statistics for pfring stream %u", (uint32_t)i);
859                        continue;
860                }
861
862                if (stat->dropped_valid) {
863                        stat->dropped += st.drop;
864                } else {
865                        stat->dropped = st.drop;
866                        stat->dropped_valid = 1;
867                }
868
869                if (stat->received_valid) {
870                        stat->received += st.recv;
871                } else {
872                        stat->received = st.recv;
873                        stat->received_valid = 1;
874                }
875        }
876
877}
878
879static libtrace_eventobj_t pfring_event(libtrace_t *libtrace, 
880                libtrace_packet_t *packet) {
881
882        libtrace_eventobj_t event = {0,0,0.0,0};
883        int rc;
884
885        rc = pfring_read_generic(libtrace, packet, FORMAT_DATA_FIRST, 0, NULL);
886       
887        if (rc > 0) {
888                event.size = rc;
889                event.type = TRACE_EVENT_PACKET;
890        } else if (rc == 0) {
891                if (libtrace_halt) {
892                        event.type = TRACE_EVENT_TERMINATE;
893                } else {
894                        event.type = TRACE_EVENT_IOWAIT;
895                        event.fd = pfring_get_selectable_fd(FORMAT_DATA_FIRST->pd);
896                }
897        } else {
898                event.type = TRACE_EVENT_TERMINATE;
899        }
900        return event;
901}
902
903static int pfring_pread_packets(libtrace_t *libtrace,
904                libtrace_thread_t *t, 
905                libtrace_packet_t *packets[],
906                size_t nb_packets) {
907
908        size_t readpackets = 0;
909        int rc = 0;
910        struct pfring_per_stream_t *stream = (struct pfring_per_stream_t *)t->format_data;
911        uint8_t block = 1;
912
913        /* Block for the first packet, then read up to nb_packets if they
914         * are available. */
915        do {
916                rc = pfring_read_generic(libtrace, packets[readpackets], 
917                        stream, block, &t->messages);
918                if (rc == READ_MESSAGE) {
919                        if (readpackets == 0) {
920                                return rc;
921                        }
922                        break;
923                }
924                               
925                if (rc == READ_ERROR)
926                        return rc;
927
928                if (rc == 0)
929                        continue;
930               
931                block = 0;
932                readpackets ++;
933                if (readpackets >= nb_packets)
934                        break;
935
936        } while (rc != 0);
937
938        return readpackets;
939}
940
941static int pfring_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
942                bool reading) {
943
944        uint32_t cpus = trace_get_number_of_cores();
945
946        if (reading) {
947                struct pfring_per_stream_t *stream;
948                int tid = 0;
949                if (t->type == THREAD_PERPKT) {
950                        t->format_data = libtrace_list_get_index(FORMAT_DATA->per_stream, t->perpkt_num)->data;
951                        if (t->format_data == NULL) {
952                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
953                                                "Too many threads registered");
954                                return -1;
955                        }
956                        tid = t->perpkt_num;
957                } else {
958                        t->format_data = FORMAT_DATA_FIRST;
959                }
960
961                stream = t->format_data;
962                if (cpus > 1) {
963                        cpu_set_t cpuset;
964                        uint32_t coreid;
965                        int s;
966
967                        coreid = (tid + 1) % cpus;
968                        CPU_ZERO(&cpuset);
969                        CPU_SET(coreid, &cpuset);
970                        if ((s = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset)) != 0) {
971                                trace_set_err(libtrace, errno, "Warning "
972                                                "failed to set affinity for "
973                                                "pfring thread");
974                                return -1;
975                        }
976                        stream->affinity = coreid;
977                }
978        }
979
980        return 0;               
981
982}
983
984static struct libtrace_format_t pfringformat = {
985        "pfring",
986        "$Id$",
987        TRACE_FORMAT_PFRING,
988        NULL,                           /* probe filename */
989        NULL,                           /* probe magic */
990        pfring_init_input,                /* init_input */
991        pfring_config_input,              /* config_input */
992        pfring_start_input,               /* start_input */
993        pfring_pause_input,               /* pause_input */
994        NULL,                           /* init_output */
995        NULL,                           /* config_output */
996        NULL,                           /* start_output */
997        pfring_fin_input,                 /* fin_input */
998        NULL,                           /* fin_output */
999        pfring_read_packet,               /* read_packet */
1000        pfring_prepare_packet,            /* prepare_packet */
1001        NULL,                           /* fin_packet */
1002        NULL,                             /* write_packet */
1003        pfring_get_link_type,             /* get_link_type */
1004        pfring_get_direction,             /* get_direction */
1005        lt_pfring_set_direction,             /* set_direction */
1006        pfring_get_erf_timestamp,         /* get_erf_timestamp */
1007        NULL,               /* get_timeval */
1008        NULL,                           /* get_seconds */
1009        NULL,                           /* get_timespec */
1010        NULL,                           /* seek_erf */
1011        NULL,                           /* seek_timeval */
1012        NULL,                           /* seek_seconds */
1013        pfring_get_capture_length,        /* get_capture_length */
1014        pfring_get_wire_length,           /* get_wire_length */
1015        pfring_get_framing_length,        /* get_framing_length */
1016        pfring_set_capture_length,        /* set_capture_length */
1017        NULL,                           /* get_received_packets */
1018        NULL,                           /* get_filtered_packets */
1019        NULL,                           /* get_dropped_packets */
1020        pfring_get_statistics,          /* get_statistics */
1021        NULL,                           /* get_fd */
1022        pfring_event,              /* trace_event */
1023        NULL,                      /* help */
1024        NULL,                   /* next pointer */
1025        {true, MAX_NUM_RX_CHANNELS},         /* Live, with thread limit */
1026        pfring_pstart_input,         /* pstart_input */
1027        pfring_pread_packets,        /* pread_packets */
1028        pfring_pause_input,        /* ppause */
1029        pfring_fin_input,          /* p_fin */
1030        pfring_pregister_thread,        /* register thread */ 
1031        NULL,                           /* unregister thread */
1032        NULL                            /* get thread stats */
1033
1034};
1035
1036void pfring_constructor(void) {
1037        register_format(&pfringformat);
1038}
Note: See TracBrowser for help on using the repository browser.