Ignore:
Timestamp:
01/28/16 17:30:32 (7 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
pfring
Children:
93c9aff
Parents:
e2cd7d8
Message:

Started on pfring_zc format

Doesn't compile yet, but needed to commit this so I could work on
some other things without worrying about losing this work.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_pfring.c

    re2cd7d8 ra21b45e  
    5050
    5151#include <pfring.h>
     52#include <pfring_zc.h>
    5253
    5354struct pfring_format_data_t {
     
    5960};
    6061
     62struct pfringzc_per_thread {
     63
     64        uint32_t lastbatch;
     65        uint32_t nextpacket;
     66        pfring_zc_pkt_buff ** buffers;
     67}
     68
     69
     70struct pfringzc_format_data_t {
     71        pfring_zc_cluster *cluster;
     72        pfring_zc_worker *hasher;
     73        pfring_zc_buffer_pool *pool;
     74
     75        pfring_zc_queue **inqueues;
     76        pfring_zc_queue **outqueues;
     77        uint16_t clusterid;     
     78        int numthreads;
     79
     80        struct pfringzc_per_thread *perthreads;
     81
     82        int8_t promisc;
     83        int snaplen;
     84        char *bpffilter;
     85        enum hasher_types hashtype;
     86
     87};
     88
    6189struct pfring_per_stream_t {
    6290
     
    6997
    7098#define DATA(x) ((struct pfring_format_data_t *)x->format_data)
     99#define ZCDATA(x) ((struct pfringzc_format_data_t *)x->format_data)
    71100#define STREAM_DATA(x) ((struct pfring_per_stream_t *)x->data)
    72101
    73102#define FORMAT_DATA DATA(libtrace)
     103#define ZC_FORMAT_DATA ZCDATA(libtrace)
    74104#define FORMAT_DATA_HEAD FORMAT_DATA->per_stream->head
    75105#define FORMAT_DATA_FIRST ((struct pfring_per_stream_t *)FORMAT_DATA_HEAD->data)
    76106
     107#define PFRINGZC_BATCHSIZE 10
    77108
    78109typedef union {
     
    207238}       
    208239
     240static inline int pfringzc_init_queues(libtrace_t *libtrace,
     241                struct pfringzc_format_data_t *fdata, int threads) {
     242
     243        int i, j;
     244        char devname[4096];
     245
     246        fdata->inqueues = calloc(threads, sizeof(pfring_zc_queue *));
     247        fdata->outqueues = calloc(threads, sizeof(pfring_zc_queue *));
     248        fdata->perthreads = calloc(threads, sizeof(struct pfringzc_per_thread));
     249
     250        for (i = 0; i < threads; i++) {
     251                snprintf(devname, 4095, "zc:%s@%d", libtrace->uridata, i);
     252               
     253                fdata->perthreads[i]->buffers = calloc(PFRINGZC_BATCHSIZE, sizeof(pfring_zc_pkt_buff *));
     254                fdata->perthreads[i]->lastbatch = 0;
     255                fdata->perthreads[i]->nextpacket = 0;
     256
     257                for (j = 0; j < PFRINGZC_BATCHSIZE; j++) {
     258                        fdata->perthreads[i]->buffers[j] = pfring_zc_get_packet_handle(fdata->cluster);
     259               
     260                        if (fdata->perthreads[i]->buffers[j] == NULL) {
     261                                trace_set_err(libtrace, errno, "Failed to create pfringzc packet handle");
     262                                goto error;
     263                        }
     264                }
     265               
     266                fdata->inqueues[i] = pfring_zc_open_device(fdata->cluster,
     267                                devname, rx_only, 0);
     268                if (data->inqueues[i] == NULL) {
     269                        trace_set_err(libtrace, errno, "Failed to create pfringzc in queue");
     270                        goto error;
     271                }
     272
     273
     274                fdata->outqueues[i] = pfring_zc_create_queue(fdata->cluster,
     275                                8192);
     276                if (data->outqueues[i] == NULL) {
     277                        trace_set_err(libtrace, errno, "Failed to create pfringzc out queue");
     278                        goto error;
     279                }
     280
     281        }
     282
     283        fdata->pool = pfring_zc_create_buffer_pool(fdata->cluster, 8);
     284        if (fdata->pool == NULL) {
     285                trace_set_err(libtrace, errno, "Failed to create pfringzc buffer pool");
     286                goto error;
     287        }
     288
     289        return 0;
     290
     291error:
     292        //pfringzc_destroy_queues(libtrace, fdata, threads);
     293        return -1;
     294
     295}
     296
     297static int pfringzc_start_input(libtrace_t *libtrace) {
     298
     299        if (ZC_FORMAT_DATA->cluster != NULL) {
     300                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
     301                        "Attempted to start a pfringzc: input that was already started!");
     302                return -1;
     303        }
     304
     305        if (libtrace->uridata == NULL) {
     306                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     307                                "Missing interface name from pfringzc: URI");
     308                return -1;
     309        }
     310
     311        ZC_FORMAT_DATA->cluster = pfring_zc_create_cluster(
     312                        ZC_FORMAT_DATA->clusterid,
     313                        1600,   /* TODO calculate */
     314                        0,      /* meta-data length */
     315                        8192 * 32687 + PFRINGZC_BATCHSIZE,  /* number of buffers */
     316                        pfring_zc_numa_get_cpu_node(0), /* bind to core 0 */
     317                        NULL    /* auto hugetlb mountpoint */
     318                        );
     319        if (ZC_FORMAT_DATA->cluster == NULL) {
     320                trace_set_err(libtrace, errno, "Failed to create pfringzc cluster");
     321                return -1;
     322        }
     323
     324        if (pfringzc_init_queues(libtrace, ZC_FORMAT_DATA, 1) == -1)
     325                return -1;
     326
     327        /* No hasher necessary, as we just have one thread */
     328        ZC_FORMAT_DATA->hasher = pfring_zc_run_balancer(
     329                ZC_FORMAT_DATA->inqueues, ZC_FORMAT_DATA->outqueues, 1, 1,
     330                ZC_FORMAT_DATA->pool, round_robin_bursts_policy, NULL,
     331                NULL, NULL, 1, 0);
     332        return 0;
     333}
     334
    209335static int pfring_start_input(libtrace_t *libtrace) {
    210336        struct pfring_per_stream_t *stream = FORMAT_DATA_FIRST;
     
    323449
    324450        return 0;
     451}
     452
     453static int pfringzc_init_input(libtrace_t *libtrace) {
     454
     455        libtrace->format_data = (struct pfringzc_format_data_t *)
     456                malloc(sizeof(struct pfringzc_format_data_t));
     457        assert(libtrace->format_data != NULL);
     458       
     459        ZC_FORMAT_DATA->promisc = -1;
     460        ZC_FORMAT_DATA->snaplen = LIBTRACE_PACKET_BUFSIZE;
     461        ZC_FORMAT_DATA->bpffilter = NULL;
     462
     463        ZC_FORMAT_DATA->cluster = NULL;
     464        ZC_FORMAT_DATA->inqueue = NULL;
     465        ZC_FORMAT_DATA->outqueues = NULL;
     466        ZC_FORMAT_DATA->buffers = NULL;
     467        ZC_FORMAT_DATA->pool = NULL;
     468        ZC_FORMAT_DATA->hasher = NULL;
     469        ZC_FORMAT_DATA->hashtype = HASHER_BIDIRECTIONAL;
     470        ZC_FORMAT_DATA->clusterid = (uint16_t)rand();
     471
     472        return 0;
     473}
     474
     475static int pfringzc_config_input(libtrace_t *libtrace, trace_option_t option,
     476                void *data) {
     477
     478        switch (option) {
     479                case TRACE_OPTION_SNAPLEN:
     480                        ZC_FORMAT_DATA->snaplen = *(int *)data;
     481                        return 0;
     482                case TRACE_OPTION_PROMISC:
     483                        ZC_FORMAT_DATA->promisc = *(int *)data;
     484                        return 0;
     485                case TRACE_OPTION_FILTER:
     486                        ZC_FORMAT_DATA->bpffilter = strdup((char *)data);
     487                        return 0;
     488                case TRACE_OPTION_HASHER:
     489                        /* We can do bidirectional hashing on hardware
     490                         * by default, thanks to the ZC library */
     491                        ZC_FORMAT_DATA->hashtype = *((enum hasher_types *)data);
     492                        switch (*((enum hasher_types *)data)) {
     493                                case HASHER_BIDIRECTIONAL:
     494                                case HASHER_UNIDIRECTIONAL:
     495                                        return 0;
     496                                case HASHER_BALANCE:
     497                                        return 0;               
     498                                case HASHER_CUSTOM:
     499                                        return -1;
     500                        }
     501                        break;
     502                case TRACE_OPTION_META_FREQ:
     503                        break;
     504                case TRACE_OPTION_EVENT_REALTIME:
     505                        break;
     506        }
     507        return -1;
    325508}
    326509
     
    375558}
    376559
     560static int pfringzc_pause_input(libtrace_t *libtrace) {
     561
     562        /* hopefully this will clean up our buffers and queues? */
     563        pfring_zc_kill_worker(ZC_FORMAT_DATA->hasher);
     564        pfring_zc_destroy_cluster(ZC_FORMAT_DATA->cluster);
     565        return 0;
     566}
     567
    377568static int pfring_fin_input(libtrace_t *libtrace) {
    378569
     
    388579
    389580
     581static int pfringzc_fin_input(libtrace_t *input) {
     582        if (libtrace->format_data) {
     583                if (ZC_FORMAT_DATA->bpffilter)
     584                        free(ZC_FORMAT_DATA->bpffilter);
     585                free(libtrace->format_data);
     586        }
     587        return 0;
     588
     589}
     590
    390591static int pfring_get_capture_length(const libtrace_packet_t *packet) {
    391592        struct libtrace_pfring_header *phdr;
     
    444645        packet->payload = (buffer + sizeof(struct libtrace_pfring_header));
    445646
     647        return 0;
     648}
     649
     650static int pfringzc_read_batch(libtrace_t *libtrace, int oq, uint8_t block,
     651                libtrace_message_queue_t *queue) {
     652
     653        int received;
     654
     655        do {
     656                received = pfring_zc_recv_pkt_burst(
     657                                ZC_FORMAT_DATA->outqueues[oq],
     658                                ZC_FORMAT_DATA->buffers[oq],
     659                                PFRINGZC_BATCHSIZE,
     660                                0);
     661               
     662                if (received < 0) {
     663                        trace_set_err(libtrace, errno, "Failed to read packet batch from pfringzc:");
     664                        return -1;
     665                }
     666
     667                if (received == 0) {
     668                        if (queue && libtrace_message_queue_count(queue) > 0)
     669                                return READ_MESSAGE;
     670                        continue;
     671                }
     672
     673                ZC_FORMAT_DATA->lastbatch[oq] = received;
     674                ZC_FORMAT_DATA->nextpacket[oq] = 0;             
     675
     676        } while (block);
    446677        return 0;
    447678}
     
    485716                return 0;
    486717
    487         /* Convert the header fields to network byte order so we can
    488          * export them over RT safely. Also deal with 32 vs 64 bit
    489          * timevals! */
    490         //hdr->ts.tv_sec = bswap_host_to_le64((uint64_t)local.ts.tv_sec);
    491         //hdr->ts.tv_usec = bswap_host_to_le64((uint64_t)local.ts.tv_usec);
    492718#if __BYTE_ORDER == __LITTLE_ENDIAN
    493719        hdr->byteorder = PFRING_BYTEORDER_LITTLEENDIAN;
     
    496722#endif
    497723
    498 /*
    499         hdr->caplen = htonl(local.caplen);
    500         hdr->wlen = htonl(local.wlen);
    501         hdr->ext.ts_ns = bswap_host_to_le64(local.ext.ts_ns);
    502         hdr->ext.flags = htonl(local.ext.flags);
    503         hdr->ext.if_index = htonl(local.ext.if_index);
    504         hdr->ext.hash = htonl(local.ext.hash);
    505         hdr->ext.tx.bounce_iface = htonl(local.ext.tx.bounce_iface);
    506         hdr->ext.parsed_hdr_len = htons(local.ext.parsed_hdr_len);
    507 */
    508724        hdr->caplen = (local.caplen);
    509725        hdr->wlen = (local.wlen);
     
    528744        return pfring_get_capture_length(packet) +
    529745                        pfring_get_framing_length(packet);
     746
     747}
     748
     749static int pfringzc_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
     750{
     751
     752        struct pfringzc_per_thread *pzt = ZC_FORMAT_DATA->perthreads[0];
     753
     754        if (pzt->nextpacket >= pzt->lastbatch) {
     755                /* Read a fresh batch of packets */
     756                if (pfringzc_read_batch(libtrace, 0, 1, NULL) < 0) {
     757                        return -1;
     758                }
     759               
     760        }
     761
     762        pfring_zc_pkt_buff *pbuf = pzt->buffers[pzt->nextpacket];
     763        pzt->nextpacket ++;
     764
     765       
    530766
    531767}
Note: See TracChangeset for help on using the changeset viewer.