Changeset 274770a


Ignore:
Timestamp:
11/03/15 17:12:33 (6 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
pfring
Children:
06926f4
Parents:
4c365fa
Message:

Added parallel API support for pf_ring

Also added a general purpose "count our CPUs" function to
format_helper (thanks Perry for linking me some example code
on how to do this!).

Location:
lib
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • lib/format_helper.c

    rd8b05b7 r274770a  
    4646#include <time.h>
    4747#include "format_helper.h"
     48#include <unistd.h>
     49#ifndef _SC_NPROCESSORS_ONLN
     50#include <sys/param.h>
     51#include <sys/sysctl.h>
     52#endif
    4853
    4954#include <assert.h>
     
    331336        va_end(va);
    332337}
     338
     339uint32_t trace_get_number_of_cores(void) {
     340
     341        uint32_t t = 0;
     342#ifdef _SC_NPROCESSORS_ONLN
     343        t = sysconf(_SC_NPROCESSORS_ONLN);
     344        if (t < 1 || t > ((uint32_t)1 << 31))
     345                t = sysconf(_SC_NPROCESSORS_CONF);
     346#else
     347        int nm[2];
     348        size_t len = 4;
     349       
     350        nm[0] = CTL_HW; nm[1] = HW_AVAILCPU;
     351        sysctl(nm, 2, &t, &len, NULL, 0);
     352       
     353        if (t < 1) {
     354                nm[1] = HW_NCPU;
     355                sysctl(nm, 2, &t, &len, NULL, 0);
     356        }
     357#endif
     358        if (t < 1 || t > ((uint32_t)1 << 31))
     359                t = 4;
     360        return t;
     361}
  • lib/format_helper.h

    rd8b05b7 r274770a  
    9595                int level,
    9696                int filemode);
     97
     98/** Determines the number of cores available on the host.
     99 *
     100 * @return The number of cores detected by this function.
     101 */
     102uint32_t trace_get_number_of_cores(void);
    97103#endif /* FORMAT_HELPER_H */
  • lib/format_pfring.c

    r4c365fa r274770a  
    2828 */
    2929
     30#define _GNU_SOURCE
    3031#include "config.h"
    3132#include "libtrace.h"
     
    186187}
    187188
     189static inline uint32_t pfring_flags(libtrace_t *libtrace) {
     190        uint32_t flags = PF_RING_TIMESTAMP | PF_RING_LONG_HEADER;
     191        flags |= PF_RING_HW_TIMESTAMP;
     192
     193        if (FORMAT_DATA->promisc > 0)
     194                flags |= PF_RING_PROMISC;
     195        return flags;
     196}       
     197
    188198static int pfring_start_input(libtrace_t *libtrace) {
    189199        struct pfring_per_stream_t *stream = FORMAT_DATA_FIRST;
     200        int rc = 0;
     201
    190202        if (libtrace->uridata == NULL) {
    191203                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     
    193205                return -1;
    194206        }
    195         uint32_t flags = PF_RING_TIMESTAMP | PF_RING_LONG_HEADER;
    196         flags |= PF_RING_HW_TIMESTAMP;
    197 
    198         if (FORMAT_DATA->promisc > 0)
    199                 flags |= PF_RING_PROMISC;
    200        
    201207        if (FORMAT_DATA->ringenabled) {
    202208                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
     
    205211        }
    206212
    207         stream->pd = pfring_open(libtrace->uridata, FORMAT_DATA->snaplen, flags);
    208 
    209         return pfring_start_input_stream(libtrace, FORMAT_DATA_FIRST);
    210 }
     213        stream->pd = pfring_open(libtrace->uridata, FORMAT_DATA->snaplen,
     214                pfring_flags(libtrace));
     215        if (stream->pd == NULL) {
     216                trace_set_err(libtrace, errno, "pfring_open failed: %s",
     217                                strerror(errno));
     218                return -1;
     219        }
     220
     221        rc = pfring_start_input_stream(libtrace, FORMAT_DATA_FIRST);
     222        if (rc < 0)
     223                return rc;     
     224        FORMAT_DATA->ringenabled = 1;
     225        return rc;
     226}
     227
     228static int pfring_pstart_input(libtrace_t *libtrace) {
     229        pfring *ring[MAX_NUM_RX_CHANNELS];
     230        uint8_t channels;
     231        struct pfring_per_stream_t empty = ZERO_PFRING_STREAM;
     232        int i, iserror = 0;
     233       
     234        if (libtrace->uridata == NULL) {
     235                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
     236                                "Missing interface name from pfring: URI");
     237                return -1;
     238        }
     239        if (FORMAT_DATA->ringenabled) {
     240                trace_set_err(libtrace, TRACE_ERR_BAD_STATE,
     241                        "Attempted to start a pfring: input that was already started!");
     242                return -1;
     243        }
     244
     245        channels = pfring_open_multichannel(libtrace->uridata,
     246                        FORMAT_DATA->snaplen, pfring_flags(libtrace), ring);
     247        if (channels <= 0) {
     248                trace_set_err(libtrace, errno,
     249                                "pfring_open_multichannel failed: %s",
     250                                strerror(errno));
     251                return -1;
     252        }
     253
     254        printf("got %u channels\n", channels);
     255
     256        if (libtrace->perpkt_thread_count < channels) {
     257                fprintf(stderr, "WARNING: pfring interface has %u channels, "
     258                                "but this libtrace program has only enough "
     259                                "threads to read the first %u channels.",
     260                                channels, libtrace->perpkt_thread_count);
     261        }
     262
     263        if (channels < libtrace->perpkt_thread_count)
     264                libtrace->perpkt_thread_count = channels;
     265       
     266
     267        for (i = 0; i < channels; i++) {
     268                struct pfring_per_stream_t *stream;
     269                if (libtrace_list_get_size(FORMAT_DATA->per_stream)<=(size_t)i)
     270                        libtrace_list_push_back(FORMAT_DATA->per_stream, &empty);
     271
     272                stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
     273                stream->pd = ring[i];
     274                if (pfring_start_input_stream(libtrace, stream) != 0) {
     275                        iserror = 1;
     276                        break;
     277                }
     278        }
     279
     280        if (iserror) {
     281                /* Error state: free any streams we managed to create */
     282                for (i = i - 1; i >= 0; i--) {
     283                        struct pfring_per_stream_t *stream;
     284                        stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
     285
     286                        pfring_disable_ring(stream->pd);       
     287                        pfring_remove_bpf_filter(stream->pd);
     288                        pfring_close(stream->pd);
     289                }
     290                return -1;
     291        }
     292        FORMAT_DATA->ringenabled = 1;
     293        return 0;
     294}
     295
    211296
    212297static int pfring_init_input(libtrace_t *libtrace) {
     
    339424
    340425static int pfring_read_generic(libtrace_t *libtrace, libtrace_packet_t *packet,
    341                 uint8_t block)
     426                struct pfring_per_stream_t *stream, uint8_t block,
     427                libtrace_message_queue_t *queue)
    342428{
    343429
    344         struct pfring_per_stream_t *stream = FORMAT_DATA_FIRST;
    345430        struct libtrace_pfring_header *hdr;
    346431        struct local_pfring_header local;
     
    355440                }
    356441        }
    357 
     442       
    358443        hdr = (struct libtrace_pfring_header *)packet->buffer;
    359         if ((rc = pfring_recv(stream->pd, (u_char **)&packet->payload,
    360                         0, (struct pfring_pkthdr *)&local, block)) == -1)
    361         {
    362                 trace_set_err(libtrace, errno, "Failed to read packet from pfring:");
    363                 return -1;
    364         }
    365 
    366         /* We were asked not to block and there are no packets available */
     444        while (block) {
     445                if ((rc = pfring_recv(stream->pd, (u_char **)&packet->payload,
     446                        0, (struct pfring_pkthdr *)&local, 0)) == -1)
     447                {
     448                        trace_set_err(libtrace, errno, "Failed to read packet from pfring:");
     449                        return -1;
     450                }
     451
     452                if (rc == 0) {
     453                        if (queue && libtrace_message_queue_count(queue) > 0)
     454                                return READ_MESSAGE;
     455                        continue;
     456                }
     457                break;
     458        }
     459
    367460        if (rc == 0)
    368461                return 0;
     
    392485        packet->type = TRACE_RT_DATA_PFRING;
    393486        packet->header = packet->buffer;
     487        packet->error = 1;
    394488
    395489        return pfring_get_capture_length(packet) +
     
    400494static int pfring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
    401495{
    402         return pfring_read_generic(libtrace, packet, 1);
     496        return pfring_read_generic(libtrace, packet, FORMAT_DATA_FIRST, 1, NULL);
    403497}
    404498
     
    497591        int rc;
    498592
    499         rc = pfring_read_generic(libtrace, packet, 0);
     593        rc = pfring_read_generic(libtrace, packet, FORMAT_DATA_FIRST, 0, NULL);
    500594       
    501595        if (rc > 0) {
     
    515609}
    516610
    517 static int pfring_pstart_input(libtrace_t *libtrace) {
    518         trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED, "Haven't implemented parallel support for pfring yet!");
    519         return -1;
    520 }
    521 
    522611static int pfring_pread_packets(libtrace_t *libtrace,
    523                 libtrace_thread_t *t UNUSED, libtrace_packet_t *packets[] UNUSED,
    524                 UNUSED size_t nb_packets) {
    525 
    526         trace_set_err(libtrace, TRACE_ERR_UNSUPPORTED, "Haven't implemented parallel support for pfring yet!");
    527         return -1;
    528 }
    529 
    530 
     612                libtrace_thread_t *t,
     613                libtrace_packet_t *packets[],
     614                size_t nb_packets) {
     615
     616        size_t readpackets = 0;
     617        int rc = 0;
     618        struct pfring_per_stream_t *stream = (struct pfring_per_stream_t *)t->format_data;
     619        uint8_t block = 1;
     620
     621        /* Block for the first packet, then read up to nb_packets if they
     622         * are available. */
     623        do {
     624                rc = pfring_read_generic(libtrace, packets[readpackets],
     625                        stream, block, &t->messages);
     626                if (rc == READ_MESSAGE) {
     627                        if (readpackets == 0) {
     628                                return rc;
     629                        }
     630                        break;
     631                }
     632                               
     633                if (rc == READ_ERROR)
     634                        return rc;
     635
     636                if (rc == 0)
     637                        continue;
     638               
     639                block = 0;
     640                readpackets ++;
     641                if (readpackets >= nb_packets)
     642                        break;
     643
     644        } while (rc != 0);
     645
     646        return readpackets;
     647}
     648
     649static int pfring_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
     650                bool reading) {
     651
     652        uint32_t cpus = trace_get_number_of_cores();
     653
     654        if (reading) {
     655                struct pfring_per_stream_t *stream;
     656                int tid = 0;
     657                if (t->type == THREAD_PERPKT) {
     658                        t->format_data = libtrace_list_get_index(FORMAT_DATA->per_stream, t->perpkt_num)->data;
     659                        if (t->format_data == NULL) {
     660                                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
     661                                                "Too many threads registered");
     662                                return -1;
     663                        }
     664                        tid = t->perpkt_num;
     665                } else {
     666                        t->format_data = FORMAT_DATA_FIRST;
     667                }
     668
     669                stream = t->format_data;
     670                if (cpus > 1) {
     671                        cpu_set_t cpuset;
     672                        uint32_t coreid;
     673                        int s;
     674
     675                        coreid = (tid + 1) % cpus;
     676                        CPU_ZERO(&cpuset);
     677                        CPU_SET(coreid, &cpuset);
     678                        if ((s = pthread_setaffinity_np(pthread_self(), sizeof(cpuset), &cpuset)) != 0) {
     679                                trace_set_err(libtrace, errno, "Warning "
     680                                                "failed to set affinity for "
     681                                                "pfring thread");
     682                                return -1;
     683                        }
     684                        stream->affinity = coreid;
     685                }
     686        }
     687
     688        return 0;               
     689
     690}
    531691
    532692static struct libtrace_format_t pfringformat = {
     
    571731        NULL,                      /* help */
    572732        NULL,                   /* next pointer */
    573         {true, -1},                     /* Live, no thread limit */
     733        {true, MAX_NUM_RX_CHANNELS},         /* Live, with thread limit */
    574734        pfring_pstart_input,         /* pstart_input */
    575735        pfring_pread_packets,        /* pread_packets */
    576736        pfring_pause_input,        /* ppause */
    577737        pfring_fin_input,          /* p_fin */
    578         NULL,                   /* register thread XXX */
     738        pfring_pregister_thread,        /* register thread */
    579739        NULL,                           /* unregister thread */
    580740        NULL                            /* get thread stats */
Note: See TracChangeset for help on using the changeset viewer.