Ignore:
File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_linux.c

    rb13b939 r53eb1aa  
    7272#include <sys/mman.h>
    7373
    74 #include <fcntl.h>
    75 
    7674/* MAX_ORDER is defined in linux/mmzone.h. 10 is default for 2.4 kernel.
    7775 * max_order will be decreased by one if the ring buffer fails to allocate.
     
    149147#define PACKET_HDRLEN   11
    150148#define PACKET_TX_RING  13
    151 #define PACKET_FANOUT   18
    152149#define TP_STATUS_USER  0x1
    153150#define TP_STATUS_SEND_REQUEST  0x1
     
    157154#define TPACKET_ALIGN(x)        (((x)+TPACKET_ALIGNMENT-1)&~(TPACKET_ALIGNMENT-1))
    158155#define TPACKET_HDRLEN         (TPACKET_ALIGN(sizeof(struct tpacket2_hdr)) + sizeof(struct sockaddr_ll))
    159 
    160 /* Since 3.1 kernel we have packet_fanout support */
    161 // schedule to socket by skb's rxhash - the implementation is bi-directional
    162 #define PACKET_FANOUT_HASH              0
    163 // schedule round robin
    164 #define PACKET_FANOUT_LB                1
    165 // schedule to the same socket that received the packet
    166 #define PACKET_FANOUT_CPU               2
    167 // Something to do with fragmented packets and hashing problems !! TODO figure out if this needs to be on
    168 #define PACKET_FANOUT_FLAG_DEFRAG       0x8000
    169 /* Included but unused by libtrace since 3.10 */
    170 // if one socket if full roll over to the next
    171 #define PACKET_FANOUT_ROLLOVER          3
    172 // This flag makes any other system roll over
    173 #define PACKET_FANOUT_FLAG_ROLLOVER     0x1000
    174 /* Included but unused by libtrace since 3.12 */
    175 // schedule random
    176 #define PACKET_FANOUT_RND               4
    177 
    178156
    179157enum tpacket_versions {
     
    206184        unsigned int tp_frame_size;  /* Size of frame */
    207185        unsigned int tp_frame_nr;    /* Total number of frames */
    208 };
    209 
    210 struct linux_per_thread_t {
    211         char *rx_ring;
    212         int rxring_offset;
    213         int fd;
    214         // The flag layout should be the same for all (I Hope)
    215         // max_order
    216186};
    217187
     
    242212        /* Used to determine buffer size for the ring buffer */
    243213        uint32_t max_order;
    244         /* Used for the parallel case, fanout is the mode */
    245         uint16_t fanout_flags;
    246         /* The group lets Linux know which sockets to group together
    247          * so we use a random here to try avoid collisions */
    248         uint16_t fanout_group;
    249         /* When running in parallel mode this is malloc'd with an array
    250          * file descriptors from packet fanout will use, here we assume/hope
    251          * that every ring can get setup the same */
    252         struct linux_per_thread_t *per_thread;
    253214};
    254215
     
    406367        FORMAT(libtrace->format_data)->rxring_offset = 0;
    407368        FORMAT(libtrace->format_data)->max_order = MAX_ORDER;
    408         FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_LB; // This might be best or alternatively PACKET_FANOUT_LB
    409         // Some examples use pid for the group however that would limit a single
    410         // application to use only int/ring format, instead using rand
    411         FORMAT(libtrace->format_data)->fanout_group = (uint16_t) rand();
    412         FORMAT(libtrace->format_data)->per_thread = NULL;
    413369}
    414370static int linuxring_init_input(libtrace_t *libtrace)
    415371{       
    416372        init_input(libtrace);
    417         FORMAT(libtrace->format_data)->format = TRACE_RT_DATA_LINUX_RING;
     373        FORMAT(libtrace->format_data)->format = TRACE_FORMAT_LINUX_RING;
    418374        return 0;
    419375}
     
    421377{
    422378        init_input(libtrace);
    423         FORMAT(libtrace->format_data)->format = TRACE_RT_DATA_LINUX_NATIVE;
     379        FORMAT(libtrace->format_data)->format = TRACE_FORMAT_LINUX_NATIVE;
    424380        return 0;
    425381}
     
    626582        return 0;
    627583}
    628 
    629 /**
    630  * Converts a socket, either packet_mmap or standard raw socket into a
    631  * fanout socket.
    632  * NOTE: This means we can read from the socket with multiple queues,
    633  * each must be setup (identically) and then this called upon them
    634  *
    635  * @return 0 success, -1 error
    636  */
    637 static inline int socket_to_packet_fanout(int fd,
    638                                         uint16_t fanout_flags,
    639                                         uint16_t fanout_group) {
    640         int fanout_opt = ((int)fanout_flags << 16) | (int)fanout_group;
    641         if (setsockopt(fd, SOL_PACKET, PACKET_FANOUT,
    642                         &fanout_opt, sizeof(fanout_opt)) == -1) {
    643                 return -1;
    644         }
    645         return 0;
    646 }
    647 
    648 static int linuxnative_ppause_input(libtrace_t *libtrace)
    649 {
    650         int i;
    651         int tot = libtrace->perpkt_thread_count;
    652         printf("CAlling native pause packet\n");
    653        
    654         for (i = 0; i < tot; i++) {
    655                 close(FORMAT(libtrace->format_data)->per_thread[i].fd);
    656         }
    657        
    658         free(FORMAT(libtrace->format_data)->per_thread);
    659         FORMAT(libtrace->format_data)->per_thread = NULL;
    660         return 0;
    661 }
    662 
    663 static int linuxring_start_input(libtrace_t *libtrace)
    664 {
    665         char error[2048];
     584static int linuxring_start_input(libtrace_t *libtrace){
     585
     586        char error[2048];       
    666587
    667588        /* We set the socket up the same and then convert it to PACKET_MMAP */
     
    688609}
    689610
    690 static int linuxnative_pstart_input(libtrace_t *libtrace) {
    691         int i = 0;
    692         int tot = libtrace->perpkt_thread_count;
    693         int iserror = 0;
    694         // We store this here otherwise it will be leaked if the memory doesn't know
    695         struct linux_per_thread_t *per_thread = NULL;
    696        
    697         if (!FORMAT(libtrace->format_data)->per_thread) {
    698                 per_thread = calloc(tot, sizeof(struct linux_per_thread_t));
    699                 FORMAT(libtrace->format_data)->per_thread = per_thread;
    700         } else {
    701                 // Whats going on this might not work 100%
    702                 // We assume all sockets have been closed ;)
    703                 printf("Pause and then start called again lets hope that perpkt_thread_count hasn't changed\n");
    704         }
    705        
    706         printf("Calling native pstart packet\n");
    707         for (i = 0; i < tot; ++i)
    708         {
    709                 if (FORMAT(libtrace->format_data)->format == TRACE_RT_DATA_LINUX_NATIVE) {
    710                         if (linuxnative_start_input(libtrace) != 0) {
    711                                 iserror = 1;
    712                                 break;
    713                         }
    714                 } else {
    715                         // This must be ring
    716                         if (linuxring_start_input(libtrace) != 0) {
    717                                 iserror = 1;
    718                                 break;
    719                         }
    720                 }
    721                 if (socket_to_packet_fanout(FORMAT(libtrace->format_data)->fd, FORMAT(libtrace->format_data)->fanout_flags, FORMAT(libtrace->format_data)->fanout_group) != 0)
    722                 {
    723                         iserror = 1;
    724                         // Clean up here to keep consistent with every one else
    725                         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Converting the fd to a socket fanout failed");
    726                         close(FORMAT(libtrace->format_data)->fd);
    727                         free(libtrace->format_data);
    728                         libtrace->format_data = NULL;
    729                         break;
    730                 }
    731                 per_thread[i].fd = FORMAT(libtrace->format_data)->fd;
    732                 if (FORMAT(libtrace->format_data)->format == TRACE_RT_DATA_LINUX_RING) {
    733                         per_thread[i].rxring_offset = FORMAT(libtrace->format_data)->rxring_offset;
    734                         per_thread[i].rx_ring = FORMAT(libtrace->format_data)->rx_ring;
    735                 }
    736         }
    737        
    738         // Roll back those that failed - by this point in time the format_data
    739         // has been freed
    740         if (iserror) {
    741                 for (i = i - 1; i >= 0; i--) {
    742                         close(per_thread[i].fd);
    743                 }
    744                 free(per_thread);
    745                 per_thread = NULL;
    746                 return -1;
    747         }
    748        
    749         return 0;
    750 }
    751 
    752611static int linuxnative_start_output(libtrace_out_t *libtrace)
    753612{
     
    756615                free(DATAOUT(libtrace));
    757616                return -1;
    758         }
     617        }       
    759618
    760619        return 0;
     
    801660        return 0;
    802661}
    803 
    804662static int linuxring_pause_input(libtrace_t *libtrace)
    805663{
     
    942800#endif /* HAVE_NETPACKET_PACKET_H */
    943801
    944 
    945 static int linuxnative_pconfig_input(libtrace_t *libtrace,
    946                 trace_parallel_option_t option,
    947                 void *data)
    948 {
    949         switch(option) {
    950                 case TRACE_OPTION_SET_HASHER:
    951                         switch (*((enum hasher_types *)data)) {
    952                                 case HASHER_BALANCE:
    953                                         // Do fanout
    954                                         FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_LB;
    955                                         // Or we could balance to the CPU
    956                                         return 0;
    957                                 case HASHER_BIDIRECTIONAL:
    958                                 case HASHER_UNIDIRECTIONAL:
    959                                         FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_HASH;
    960                                         return 0;
    961                                 case HASHER_CUSTOM:
    962                                 case HASHER_HARDWARE:
    963                                         return -1;
    964                         }
    965                         break;
    966                 /* Avoid default: so that future options will cause a warning
    967                  * here to remind us to implement it, or flag it as
    968                  * unimplementable
    969                  */
    970         }
    971        
    972         /* Don't set an error - trace_config will try to deal with the
    973          * option and will set an error if it fails */
    974         return -1;
    975 }
    976 
    977 
    978802static int linuxnative_prepare_packet(libtrace_t *libtrace UNUSED,
    979803                libtrace_packet_t *packet, void *buffer,
     
    1047871
    1048872#ifdef HAVE_NETPACKET_PACKET_H
    1049 libtrace_thread_t * get_thread_table(libtrace_t *libtrace) ;
    1050 inline static int linuxnative_read_packet_fd(libtrace_t *libtrace, libtrace_packet_t *packet, int fd, const int check_queue)
     873static int linuxnative_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
    1051874{
    1052875        struct libtrace_linuxnative_header *hdr;
     
    1056879        struct cmsghdr *cmsg;
    1057880        int snaplen;
    1058 
    1059881        uint32_t flags = 0;
    1060882       
     
    1092914        iovec.iov_base = (void*)(packet->buffer+sizeof(*hdr));
    1093915        iovec.iov_len = snaplen;
    1094        
    1095         if (check_queue) {
    1096                 // Check for a packet - TODO only Linux has MSG_DONTWAIT should use fctl O_NONBLOCK
    1097                 hdr->wirelen = recvmsg(fd, &msghdr, MSG_DONTWAIT);
    1098                 if ((unsigned) hdr->wirelen == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
    1099                         // Do message queue check or select
    1100                         int ret;
    1101                         fd_set rfds;
    1102                         FD_ZERO(&rfds);
    1103                         FD_SET(fd, &rfds);
    1104                         FD_SET(get_thread_table(libtrace)->messages.pipefd[0], &rfds);
    1105                         int largestfd = fd > get_thread_table(libtrace)->messages.pipefd[0] ? fd : get_thread_table(libtrace)->messages.pipefd[0];
    1106                        
    1107                         do {
    1108                                 ret = select(largestfd+1, &rfds, NULL, NULL, NULL);
    1109                                 if (ret == -1 && errno != EINTR)
    1110                                         perror("Select() failed");
    1111                         }
    1112                         while (ret == -1);
    1113                        
    1114                         assert (ret == 1 || ret == 2); // No timeout 0 is not an option
    1115                        
    1116                         if (FD_ISSET(get_thread_table(libtrace)->messages.pipefd[0], &rfds)) {
    1117                                 // Not an error but check the message queue we have something
    1118                                 return -2;
    1119                         }
    1120                         // Otherwise we must have a packet
    1121                         hdr->wirelen = recvmsg(fd, &msghdr, 0);
    1122                 }
    1123         } else {
    1124                 hdr->wirelen = recvmsg(fd, &msghdr, 0);
    1125         }
    1126        
     916
     917        hdr->wirelen = recvmsg(FORMAT(libtrace->format_data)->fd, &msghdr, MSG_TRUNC);
     918
    1127919        if (hdr->wirelen==~0U) {
    1128920                trace_set_err(libtrace,errno,"recvmsg");
     
    1172964        if (cmsg == NULL) {
    1173965                struct timeval tv;
    1174                 if (ioctl(fd, SIOCGSTAMP,&tv)==0) {
     966                if (ioctl(FORMAT(libtrace->format_data)->fd,
     967                                  SIOCGSTAMP,&tv)==0) {
    1175968                        hdr->tv.tv_sec = tv.tv_sec;
    1176969                        hdr->tv.tv_usec = tv.tv_usec;
     
    1193986}
    1194987
    1195 static int linuxnative_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
    1196 {
    1197         int fd = FORMAT(libtrace->format_data)->fd;
    1198         return linuxnative_read_packet_fd(libtrace, packet, fd, 0);
    1199 }
    1200 
    1201 static int linuxnative_pread_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
    1202 {
    1203         int fd = FORMAT(libtrace->format_data)->per_thread[get_thread_table_num(libtrace)].fd;
    1204         //printf("Thread number is #%d point %p\n", get_thread_table_num(libtrace), FORMAT(libtrace->format_data)->per_thread);
    1205         return linuxnative_read_packet_fd(libtrace, packet, fd, 1);
    1206 }
    1207 
    1208988#define LIBTRACE_BETWEEN(test,a,b) ((test) >= (a) && (test) < (b))
    1209989static int linuxring_get_capture_length(const libtrace_packet_t *packet);
     
    1212992/* Release a frame back to the kernel or free() if it's a malloc'd buffer
    1213993 */
    1214 inline static void ring_release_frame(libtrace_t *libtrace, libtrace_packet_t *packet){
     994inline static void ring_release_frame(libtrace_t *libtrace, libtrace_packet_t *packet ){
    1215995        /* Free the old packet */
    1216996        if(packet->buffer == NULL)
     
    12241004                struct linux_format_data_t *ftd = FORMAT(libtrace->format_data);
    12251005               
    1226                 /* Check it's within our buffer first - consider the pause resume case it might have already been free'd lets hope we get another buffer */
    1227                 // For now let any one free anything
    1228                 /*if(LIBTRACE_BETWEEN((char *) packet->buffer,
     1006                /* Check it's within our buffer first */
     1007                if(LIBTRACE_BETWEEN((char *) packet->buffer,
    12291008                                (char *) ftd->rx_ring,
    12301009                                ftd->rx_ring
    1231                                 + ftd->req.tp_block_size * ftd->req.tp_block_nr)){*/
     1010                                + ftd->req.tp_block_size * ftd->req.tp_block_nr)){
    12321011                        TO_TP_HDR(packet->buffer)->tp_status = 0;
    12331012                        packet->buffer = NULL;
    1234                 /*}*/
    1235         }
    1236 }
    1237 
    1238 /**
    1239  * Free any resources being kept for this packet, Note: libtrace
    1240  * will ensure all fields are zeroed correctly.
    1241  */
    1242 static void linuxring_fin_packet(libtrace_packet_t *packet)
    1243 {
    1244         assert(packet->trace);
    1245        
    1246         // Started should always match the existence of the rx_ring
    1247         assert(!!FORMAT(packet->trace->format_data)->rx_ring == !!packet->trace->started);
    1248        
    1249         // Our packets are always under our control
    1250         assert(packet->buf_control == TRACE_CTRL_EXTERNAL);
    1251        
    1252         if (FORMAT(packet->trace->format_data)->rx_ring) // If we don't have a ring its already been destroyed or paused
    1253                 ring_release_frame(packet->trace, packet);
    1254         else
    1255                 packet->buffer = NULL;
    1256 }
    1257 
    1258 inline static int linuxring_read_packet_fd(libtrace_t *libtrace, libtrace_packet_t *packet, int fd, int *rxring_offset, char *rx_ring, int message) {
     1013                }
     1014        }
     1015}
     1016
     1017static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    12591018
    12601019        struct tpacket2_hdr *header;
     1020        struct pollfd pollset; 
    12611021        int ret;
     1022        unsigned int snaplen;
    12621023       
    12631024        ring_release_frame(libtrace, packet);
     
    12671028       
    12681029        /* Fetch the current frame */
    1269         header = ((void*) rx_ring) + *rxring_offset * FORMAT(libtrace->format_data)->req.tp_frame_size; // GET_CURRENT_BUFFER(libtrace);
     1030        header = GET_CURRENT_BUFFER(libtrace);
    12701031        assert((((unsigned long) header) & (pagesize - 1)) == 0);
    12711032
     
    12751036         */
    12761037        while (!(header->tp_status & TP_STATUS_USER)) {
    1277                 if (message) {
    1278                         struct pollfd pollset[2];
    1279                         pollset[0].fd = fd;
    1280                         pollset[0].events = POLLIN;
    1281                         pollset[0].revents = 0;
    1282                         pollset[1].fd = libtrace_message_queue_get_fd(&get_thread_table(libtrace)->messages);
    1283                         pollset[1].events = POLLIN;
    1284                         pollset[1].revents = 0;
    1285                         /* Wait for more data or a message*/
    1286                         ret = poll(pollset, 2, -1);
    1287                         if (ret < 0) {
    1288                                 if (errno != EINTR)
    1289                                         trace_set_err(libtrace,errno,"poll()");
    1290                                 return -1;
    1291                         }
    1292                         // Check for a message otherwise loop
    1293                         if (pollset[1].revents)
    1294                                 return -2;
    1295                 } else {
    1296                         struct pollfd pollset;
    1297                         pollset.fd = fd;
    1298                         pollset.events = POLLIN;
    1299                         pollset.revents = 0;
    1300 
    1301                         /* Wait for more data or a message*/
    1302                         ret = poll(&pollset, 1, -1);
    1303                         if (ret < 0) {
    1304                                 if (errno != EINTR)
    1305                                         trace_set_err(libtrace,errno,"poll()");
    1306                                 return -1;
    1307                         }
     1038                pollset.fd = FORMAT(libtrace->format_data)->fd;
     1039                pollset.events = POLLIN;
     1040                pollset.revents = 0;
     1041                /* Wait for more data */
     1042                ret = poll(&pollset, 1, -1);
     1043                if (ret < 0) {
     1044                        if (errno != EINTR)
     1045                                trace_set_err(libtrace,errno,"poll()");
     1046                        return -1;
    13081047                }
    13091048        }
     
    13111050        packet->buffer = header;
    13121051
     1052        /* If a snaplen was configured, automatically truncate the packet to
     1053         * the desired length.
     1054         */
     1055        snaplen=LIBTRACE_MIN(
     1056                        (int)LIBTRACE_PACKET_BUFSIZE-(int)sizeof(*header),
     1057                        (int)FORMAT(libtrace->format_data)->snaplen);
     1058       
     1059        TO_TP_HDR(packet->buffer)->tp_snaplen = LIBTRACE_MIN((unsigned int)snaplen, TO_TP_HDR(packet->buffer)->tp_len);
     1060
    13131061        /* Move to next buffer */
    1314         (*rxring_offset)++;
    1315         *rxring_offset %= FORMAT(libtrace->format_data)->req.tp_frame_nr;
     1062        FORMAT(libtrace->format_data)->rxring_offset++;
     1063        FORMAT(libtrace->format_data)->rxring_offset %= FORMAT(libtrace->format_data)->req.tp_frame_nr;
    13161064
    13171065        /* We just need to get prepare_packet to set all our packet pointers
     
    13231071                                linuxring_get_capture_length(packet);
    13241072
    1325 }
    1326 
    1327 static int linuxring_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    1328         int fd = FORMAT(libtrace->format_data)->fd;
    1329         int *rxring_offset = &FORMAT(libtrace->format_data)->rxring_offset;
    1330         char *rx_ring = FORMAT(libtrace->format_data)->rx_ring;
    1331         return linuxring_read_packet_fd(libtrace, packet, fd, rxring_offset, rx_ring, 0);
    1332 }
    1333 
    1334 static int linuxring_pread_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
    1335         int tnum = get_thread_table_num(libtrace);
    1336         int fd = FORMAT(libtrace->format_data)->per_thread[tnum].fd;
    1337         int *rxring_offset = &FORMAT(libtrace->format_data)->per_thread[tnum].rxring_offset;
    1338         char *rx_ring = FORMAT(libtrace->format_data)->per_thread[tnum].rx_ring;
    1339         printf("Thread number is #%d point %p\n", get_thread_table_num(libtrace), FORMAT(libtrace->format_data)->per_thread);
    1340         return linuxring_read_packet_fd(libtrace, packet, fd, rxring_offset, rx_ring, 1);
    13411073}
    13421074
     
    18281560        trace_event_device,             /* trace_event */
    18291561        linuxnative_help,               /* help */
    1830         NULL,                                   /* next pointer */
    1831         {true, -1},              /* Live, no thread limit */
    1832         linuxnative_pstart_input,                       /* pstart_input */
    1833         linuxnative_pread_packet,                       /* pread_packet */
    1834         linuxnative_ppause_input,                       /* ppause */
    1835         linuxnative_fin_input,                          /* p_fin */
    1836         linuxnative_pconfig_input                       /* pconfig input */
     1562        NULL
    18371563};
    18381564
     
    18541580        linuxring_read_packet,  /* read_packet */
    18551581        linuxring_prepare_packet,       /* prepare_packet */
    1856         linuxring_fin_packet,                           /* fin_packet */
     1582        NULL,                           /* fin_packet */
    18571583        linuxring_write_packet, /* write_packet */
    18581584        linuxring_get_link_type,        /* get_link_type */
     
    18771603        linuxring_event,                /* trace_event */
    18781604        linuxring_help,         /* help */
    1879         NULL,                           /* next pointer */
    1880         {true, -1},              /* Live, no thread limit */
    1881         linuxnative_pstart_input,                       /* pstart_input */
    1882         linuxring_pread_packet,                 /* pread_packet */
    1883         linuxnative_ppause_input,                       /* ppause */
    1884         linuxnative_fin_input,                          /* p_fin */
    1885         linuxnative_pconfig_input
    1886        
     1605        NULL
    18871606};
    18881607#else
     
    19361655        trace_event_device,             /* trace_event */
    19371656        linuxnative_help,               /* help */
    1938         NULL,                   /* next pointer */
    1939         NON_PARALLEL(true)
     1657        NULL
    19401658};
    19411659
     
    19801698        NULL,                           /* trace_event */
    19811699        linuxring_help,                 /* help */
    1982         NULL,                   /* next pointer */
    1983         NON_PARALLEL(true)
     1700        NULL
    19841701};
    19851702
Note: See TracChangeset for help on using the changeset viewer.