Changeset 771ab22


Ignore:
Timestamp:
02/11/15 18:47:20 (6 years ago)
Author:
Richard Sanger <rsangerarj@…>
Branches:
4.0.1-hotfixes, cachetimestamps, develop, dpdk-ndag, etsilive, libtrace4, master, ndag_format, pfring, rc-4.0.1, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance, ringtimestampfixes
Children:
6cf3ca0
Parents:
1871afc
Message:

Bring linux int: back to a compiling and working condition

Location:
lib
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • lib/Makefile.am

    r035f8a7 r771ab22  
    2929NATIVEFORMATS+= format_dpdk.c
    3030# So we also make libtrace.mk in dpdk otherwise automake tries to expand
    31 # it to early which I cannot seem to stop unless we use a path that
     31# it too early which I cannot seem to stop unless we use a path that
    3232# doesn't exist currently
    3333export RTE_SDK=@RTE_SDK@
  • lib/format_linux.c

    r035f8a7 r771ab22  
    7272static int linuxnative_init_input(libtrace_t *libtrace)
    7373{
    74         struct linux_per_stream_t stream_data;
     74        struct linux_per_stream_t stream_data= ZERO_LINUX_STREAM;
    7575
    7676        libtrace->format_data = (struct linux_format_data_t *)
     
    8282        assert(FORMAT_DATA->per_stream != NULL);
    8383
    84         /* We'll start with just one instance of stream_data, and we'll
    85          * add more later if we need them */
    86         memset(&stream_data, 0, sizeof(stream_data));
    8784        libtrace_list_push_back(FORMAT_DATA->per_stream, &stream_data);
    8885
    89         FORMAT_DATA_FIRST->fd = -1;
    9086        FORMAT_DATA->promisc = -1;
    9187        FORMAT_DATA->snaplen = LIBTRACE_PACKET_BUFSIZE;
     
    10399static int linuxnative_init_output(libtrace_out_t *libtrace)
    104100{
    105         libtrace->format_data = (struct linux_output_format_data_t*)
    106                 malloc(sizeof(struct linux_output_format_data_t));
     101        libtrace->format_data = (struct linux_format_data_out_t*)
     102                malloc(sizeof(struct linux_format_data_out_t));
    107103        assert(libtrace->format_data != NULL);
    108104
     
    117113}
    118114
    119 static int linuxnative_start_input(libtrace_t *libtrace)
     115/* Close an input stream, this is safe to be called part way through
     116 * initilisation as a cleanup function assuming streams were set to
     117 * ZERO_LINUX_STREAM to begin with.
     118 */
     119static inline void linuxnative_close_input_stream(libtrace_t *libtrace,
     120                                                  struct linux_per_stream_t *stream) {
     121        if (stream->fd != -1)
     122                close(stream->fd);
     123        stream->fd = -1;
     124        /* TODO maybe store size against stream XXX */
     125        if (stream->rx_ring)
     126                munmap(stream->rx_ring,
     127                       FORMAT_DATA->req.tp_block_size *
     128                       FORMAT_DATA->req.tp_block_nr);
     129        stream->rx_ring = NULL;
     130}
     131
     132static inline int linuxnative_start_input_stream(libtrace_t *libtrace,
     133                                                 struct linux_per_stream_t *stream)
    120134{
    121135        struct sockaddr_ll addr;
    122         int one = 1;
     136        const int one = 1;
    123137        memset(&addr,0,sizeof(addr));
    124138        libtrace_filter_t *filter = FORMAT_DATA->filter;
    125139
    126140        /* Create a raw socket for reading packets on */
    127         FORMAT_DATA_FIRST->fd =
    128                 socket(PF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
    129         if (FORMAT_DATA_FIRST->fd==-1) {
     141        stream->fd = socket(PF_PACKET, SOCK_RAW, htons(ETH_P_ALL));
     142        if (stream->fd==-1) {
    130143                trace_set_err(libtrace, errno, "Could not create raw socket");
    131                 free(libtrace->format_data);
    132                 libtrace->format_data = NULL;
    133144                return -1;
    134145        }
     
    140151                addr.sll_ifindex = if_nametoindex(libtrace->uridata);
    141152                if (addr.sll_ifindex == 0) {
    142                         close(FORMAT_DATA_FIRST->fd);
     153                        linuxnative_close_input_stream(libtrace, stream);
    143154                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
    144155                                      "Failed to find interface %s",
    145156                                      libtrace->uridata);
    146                         free(libtrace->format_data);
    147                         libtrace->format_data = NULL;
    148157                        return -1;
    149158                }
     
    151160                addr.sll_ifindex = 0;
    152161        }
    153         if (bind(FORMAT_DATA_FIRST->fd,
    154                  (struct sockaddr*)&addr,
    155                  (socklen_t)sizeof(addr))==-1) {
    156                 free(libtrace->format_data);
    157                 libtrace->format_data = NULL;
     162        if (bind(stream->fd,
     163                 (struct sockaddr*)&addr,
     164                 (socklen_t)sizeof(addr))==-1) {
     165                linuxnative_close_input_stream(libtrace, stream);
    158166                trace_set_err(libtrace, errno,
    159167                              "Failed to bind to interface %s",
     
    180188                mreq.mr_ifindex = addr.sll_ifindex;
    181189                mreq.mr_type = PACKET_MR_PROMISC;
    182                 if (setsockopt(FORMAT_DATA_FIRST->fd,
     190                if (setsockopt(stream->fd,
    183191                               SOL_PACKET,
    184192                               PACKET_ADD_MEMBERSHIP,
     
    192200         * clock resolution possible */
    193201#ifdef SO_TIMESTAMPNS
    194         if (setsockopt(FORMAT_DATA_FIRST->fd,
     202        if (setsockopt(stream->fd,
    195203                       SOL_SOCKET,
    196204                       SO_TIMESTAMPNS,
     
    203211         * if we fail the first! */
    204212#endif
    205                 if (setsockopt(FORMAT_DATA_FIRST->fd,
     213                if (setsockopt(stream->fd,
    206214                               SOL_SOCKET,
    207215                               SO_TIMESTAMP,
     
    218226         */
    219227        if (filter != NULL) {
    220                 /* Check if the filter was successfully compiled. If not,
    221                 * it is probably a bad filter and we should return an error
    222                 * before the caller tries to read any packets */
     228                /* Check if the filter was successfully compiled. If not,
     229                * it is probably a bad filter and we should return an error
     230                * before the caller tries to read any packets */
    223231                if (filter->flag == 0) {
    224                         return -1;
    225                 }
    226 
    227                 if (setsockopt(FORMAT_DATA_FIRST->fd,
     232                        linuxnative_close_input_stream(libtrace, stream);
     233                        trace_set_err(libtrace, TRACE_ERR_BAD_FILTER,
     234                                      "Cannot attach a bad filter to %s",
     235                                      libtrace->uridata);
     236                        return -1;
     237                }
     238
     239                if (setsockopt(stream->fd,
    228240                               SOL_SOCKET,
    229241                               SO_ATTACH_FILTER,
     
    237249                         */
    238250                        void *buf = malloc((size_t)LIBTRACE_PACKET_BUFSIZE);
    239                         while(recv(FORMAT_DATA_FIRST->fd,
     251                        while(recv(stream->fd,
    240252                                   buf,
    241253                                   (size_t)LIBTRACE_PACKET_BUFSIZE,
     
    248260
    249261        return 0;
     262}
     263
     264static int linuxnative_start_input(libtrace_t *libtrace)
     265{
     266        int ret = linuxnative_start_input_stream(libtrace, FORMAT_DATA_FIRST);
     267        if (ret != 0) {
     268                libtrace_list_deinit(FORMAT_DATA->per_stream);
     269                free(libtrace->format_data);
     270                libtrace->format_data = NULL;
     271        }
     272        return ret;
    250273}
    251274
     
    258281 * @return 0 success, -1 error
    259282 */
    260 static inline int socket_to_packet_fanout(int fd,
    261                                           uint16_t fanout_flags,
    262                                           uint16_t fanout_group)
    263 {
    264         int fanout_opt = ((int)fanout_flags << 16) | (int)fanout_group;
    265         if (setsockopt(fd, SOL_PACKET, PACKET_FANOUT,
     283static inline int linuxnative_socket_to_packet_fanout(libtrace_t *libtrace,
     284                                                      struct linux_per_stream_t *stream)
     285{
     286        int fanout_opt = ((int)FORMAT_DATA->fanout_flags << 16) | (int)FORMAT_DATA->fanout_group;
     287        if (setsockopt(stream->fd, SOL_PACKET, PACKET_FANOUT,
    266288                        &fanout_opt, sizeof(fanout_opt)) == -1) {
     289                trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
     290                              "Converting the fd to a socket fanout failed %s",
     291                              libtrace->uridata);
    267292                return -1;
    268293        }
     
    275300        int iserror = 0;
    276301        // We store this here otherwise it will be leaked if the memory doesn't know
    277         struct linux_per_thread_t *per_thread = NULL;
    278        
    279         if (!FORMAT(libtrace->format_data)->per_thread) {
    280                 //per_thread = calloc(tot, sizeof(struct linux_per_thread_t));
    281                 posix_memalign((void **)&per_thread, CACHE_LINE_SIZE, tot*sizeof(struct linux_per_thread_t));
    282                 FORMAT(libtrace->format_data)->per_thread = per_thread;
    283         } else {
    284                 // Whats going on this might not work 100%
    285                 // We assume all sockets have been closed ;)
    286                 printf("Pause and then start called again lets hope that perpkt_thread_count hasn't changed\n");
    287         }
    288        
     302        struct linux_per_stream_t empty_stream = ZERO_LINUX_STREAM;
     303
    289304        printf("Calling native pstart packet\n");
    290305        for (i = 0; i < tot; ++i)
    291306        {
    292                 if (FORMAT(libtrace->format_data)->format == TRACE_RT_DATA_LINUX_NATIVE) {
    293                         if (linuxnative_start_input(libtrace) != 0) {
     307                struct linux_per_stream_t *stream;
     308                /* Add storage for another stream */
     309                if (libtrace_list_get_size(FORMAT_DATA->per_stream) <= (size_t) i)
     310                        libtrace_list_push_back(FORMAT_DATA->per_stream, &empty_stream);
     311
     312                stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
     313                if (FORMAT_DATA->format == TRACE_RT_DATA_LINUX_NATIVE) {
     314                        if (linuxnative_start_input_stream(libtrace, stream) != 0) {
    294315                                iserror = 1;
    295316                                break;
    296317                        }
    297318                } else {
     319                        perror("BAD CODE XXX TODO PUT CODE HERE!!");
    298320                        // This must be ring
     321                        /*
    299322                        if (linuxring_start_input(libtrace) != 0) {
    300323                                iserror = 1;
    301324                                break;
    302                         }
    303                 }
    304                 if (socket_to_packet_fanout(FORMAT(libtrace->format_data)->fd, FORMAT(libtrace->format_data)->fanout_flags, FORMAT(libtrace->format_data)->fanout_group) != 0)
     325                        }*/
     326                }
     327                if (linuxnative_socket_to_packet_fanout(libtrace, stream) != 0)
    305328                {
    306329                        iserror = 1;
    307                         // Clean up here to keep consistent with every one else
    308                         trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Converting the fd to a socket fanout failed");
    309                         close(FORMAT(libtrace->format_data)->fd);
    310                         free(libtrace->format_data);
    311                         libtrace->format_data = NULL;
     330                        close(stream->fd);
     331                        stream->fd = -1;
    312332                        break;
    313333                }
    314                 per_thread[i].fd = FORMAT(libtrace->format_data)->fd;
    315                 if (FORMAT(libtrace->format_data)->format == TRACE_RT_DATA_LINUX_RING) {
    316                         per_thread[i].rxring_offset = FORMAT(libtrace->format_data)->rxring_offset;
    317                         per_thread[i].rx_ring = FORMAT(libtrace->format_data)->rx_ring;
    318                 }
    319         }
    320        
    321         // Roll back those that failed - by this point in time the format_data
    322         // has been freed
     334        }
     335       
     336        // Roll back those that failed
    323337        if (iserror) {
    324338                for (i = i - 1; i >= 0; i--) {
    325                         close(per_thread[i].fd);
    326                 }
    327                 free(per_thread);
    328                 per_thread = NULL;
     339                        struct linux_per_stream_t *stream;
     340                        stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
     341                        linuxnative_close_input_stream(libtrace, stream);
     342                }
     343                libtrace_list_deinit(FORMAT_DATA->per_stream);
     344                free(libtrace->format_data);
     345                libtrace->format_data = NULL;
    329346                return -1;
    330347        }
     
    335352static int linux_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t, bool reading) {
    336353        fprintf(stderr, "registering thread %d!!\n", t->perpkt_num);
    337     if (reading) {
    338         if(t->type == THREAD_PERPKT) {
    339             t->format_data = &FORMAT(libtrace->format_data)->per_thread[t->perpkt_num];
    340         } else {
    341             t->format_data = &FORMAT(libtrace->format_data)->per_thread[0];
    342         }
    343     }
    344     return 0;
     354        if (reading) {
     355                /* XXX TODO remove this oneday make sure hasher thread still works */
     356                struct linux_per_stream_t *stream;
     357                stream = libtrace_list_get_index(FORMAT_DATA->per_stream,
     358                                                 t->perpkt_num)->data;
     359                t->format_data = stream;
     360                if (!stream) {
     361                        /* This should never happen and indicates an
     362                         * internal libtrace bug */
     363                        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
     364                                      "Failed to attached thread %d to a stream",
     365                                      t->perpkt_num);
     366                        return -1;
     367                }
     368        }
     369        return 0;
    345370}
    346371
    347372static int linuxnative_start_output(libtrace_out_t *libtrace)
    348373{
    349         DATAOUT(libtrace)->fd = socket(PF_PACKET, SOCK_RAW, 0);
    350         if (DATAOUT(libtrace)->fd==-1) {
    351                 free(DATAOUT(libtrace));
     374        FORMAT_DATA_OUT->fd = socket(PF_PACKET, SOCK_RAW, 0);
     375        if (FORMAT_DATA_OUT->fd==-1) {
     376                free(FORMAT_DATA_OUT);
    352377                return -1;
    353378        }
     
    359384static int linuxnative_pause_input(libtrace_t *libtrace)
    360385{
    361         libtrace_list_node_t *tmp = FORMAT_DATA_HEAD;
     386        size_t i;
    362387
    363388        /* Stop and detach each stream */
    364         while (tmp != NULL) {
    365                 close(STREAM_DATA(tmp)->fd);
    366                 tmp = tmp->next;
     389        for (i = 0; i < libtrace_list_get_size(FORMAT_DATA->per_stream); ++i) {
     390                struct linux_per_stream_t *stream;
     391                stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
     392                linuxnative_close_input_stream(libtrace, stream);
    367393        }
    368394
     
    387413static int linuxnative_fin_output(libtrace_out_t *libtrace)
    388414{
    389         close(DATAOUT(libtrace)->fd);
    390         DATAOUT(libtrace)->fd=-1;
     415        close(FORMAT_DATA_OUT->fd);
     416        FORMAT_DATA_OUT->fd=-1;
    391417        free(libtrace->format_data);
    392418        return 0;
     
    431457
    432458                pcap = pcap_open_dead(dlt,
    433                                 FORMAT(libtrace->format_data)->snaplen);
     459                                FORMAT_DATA->snaplen);
    434460
    435461                if (pcap_compile(pcap, &f->filter, f->filterstring, 0, 0) == -1) {
     
    453479        }
    454480       
    455         if (FORMAT(libtrace->format_data)->filter != NULL)
    456                 free(FORMAT(libtrace->format_data)->filter);
    457        
    458         FORMAT(libtrace->format_data)->filter = f;
     481        if (FORMAT_DATA->filter != NULL)
     482                free(FORMAT_DATA->filter);
     483       
     484        FORMAT_DATA->filter = f;
    459485       
    460486        return 0;
     
    469495        switch(option) {
    470496                case TRACE_OPTION_SNAPLEN:
    471                         FORMAT(libtrace->format_data)->snaplen=*(int*)data;
     497                        FORMAT_DATA->snaplen=*(int*)data;
    472498                        return 0;
    473499                case TRACE_OPTION_PROMISC:
    474                         FORMAT(libtrace->format_data)->promisc=*(int*)data;
     500                        FORMAT_DATA->promisc=*(int*)data;
    475501                        return 0;
    476502                case TRACE_OPTION_FILTER:
     
    505531                                case HASHER_BALANCE:
    506532                                        // Do fanout
    507                                         FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_LB;
     533                                        FORMAT_DATA->fanout_flags = PACKET_FANOUT_LB;
    508534                                        // Or we could balance to the CPU
    509535                                        return 0;
    510536                                case HASHER_BIDIRECTIONAL:
    511537                                case HASHER_UNIDIRECTIONAL:
    512                                         FORMAT(libtrace->format_data)->fanout_flags = PACKET_FANOUT_HASH;
     538                                        FORMAT_DATA->fanout_flags = PACKET_FANOUT_HASH;
    513539                                        return 0;
    514540                                case HASHER_CUSTOM:
     
    533559                libtrace_rt_types_t rt_type, uint32_t flags) {
    534560
    535         if (packet->buffer != buffer &&
    536                         packet->buf_control == TRACE_CTRL_PACKET) {
    537                 free(packet->buffer);
    538         }
    539 
    540         if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
    541                 packet->buf_control = TRACE_CTRL_PACKET;
    542         } else
    543                 packet->buf_control = TRACE_CTRL_EXTERNAL;
    544 
    545 
    546         packet->buffer = buffer;
    547         packet->header = buffer;
     561        if (packet->buffer != buffer &&
     562            packet->buf_control == TRACE_CTRL_PACKET) {
     563                free(packet->buffer);
     564        }
     565
     566        if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
     567                packet->buf_control = TRACE_CTRL_PACKET;
     568        } else
     569                packet->buf_control = TRACE_CTRL_EXTERNAL;
     570
     571
     572        packet->buffer = buffer;
     573        packet->header = buffer;
    548574        packet->payload = (char *)buffer +
    549575                sizeof(struct libtrace_linuxnative_header);
     
    567593
    568594#ifdef HAVE_NETPACKET_PACKET_H
    569 libtrace_thread_t * get_thread_table(libtrace_t *libtrace) ;
    570 inline static int linuxnative_read_packet_fd(libtrace_t *libtrace, libtrace_packet_t *packet, int fd, const int check_queue)
     595inline static int linuxnative_read_stream(libtrace_t *libtrace,
     596                                          libtrace_packet_t *packet,
     597                                          struct linux_per_stream_t *stream,
     598                                          libtrace_message_queue_t *queue)
    571599{
    572600        struct libtrace_linuxnative_header *hdr;
     
    578606
    579607        uint32_t flags = 0;
    580         fd_set readfds;
    581         struct timeval tout;
    582         int ret;
     608        fd_set readfds;
     609        struct timeval tout;
     610        int ret;
    583611       
    584612        if (!packet->buffer || packet->buf_control == TRACE_CTRL_EXTERNAL) {
     
    596624        snaplen=LIBTRACE_MIN(
    597625                        (int)LIBTRACE_PACKET_BUFSIZE-(int)sizeof(*hdr),
    598                         (int)FORMAT(libtrace->format_data)->snaplen);
     626                        (int)FORMAT_DATA->snaplen);
    599627        /* Prepare the msghdr and iovec for the kernel to write the
    600628         * captured packet into. The msghdr will point to the part of our
     
    614642        iovec.iov_base = (void*)(packet->buffer+sizeof(*hdr));
    615643        iovec.iov_len = snaplen;
    616        
    617         if (check_queue) {
    618                 // Check for a packet - TODO only Linux has MSG_DONTWAIT should use fctl O_NONBLOCK
    619                 hdr->wirelen = recvmsg(fd, &msghdr, MSG_DONTWAIT | MSG_TRUNC);
    620                 if ((int) hdr->wirelen == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
    621                         // Do message queue check or select
    622                         int ret;
    623                         fd_set rfds;
    624                         FD_ZERO(&rfds);
    625                         FD_SET(fd, &rfds);
    626                         FD_SET(get_thread_table(libtrace)->messages.pipefd[0], &rfds);
    627                         int largestfd = fd > get_thread_table(libtrace)->messages.pipefd[0] ? fd : get_thread_table(libtrace)->messages.pipefd[0];
    628                         do {
    629                                 ret = select(largestfd+1, &rfds, NULL, NULL, NULL);
    630                                 if (ret == -1 && errno != EINTR)
    631                                         perror("Select() failed");
     644
     645        // Check for a packet - TODO only Linux has MSG_DONTWAIT should use fctl O_NONBLOCK
     646        /* Try check ahead this should be fast if something is waiting  */
     647        hdr->wirelen = recvmsg(stream->fd, &msghdr, MSG_DONTWAIT | MSG_TRUNC);
     648
     649        /* No data was waiting */
     650        if ((int) hdr->wirelen == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
     651                /* Do message queue check or select */
     652                int message_fd;
     653                int largestfd = stream->fd;
     654
     655                /* Also check the message queue */
     656                if (queue) {
     657                        message_fd = libtrace_message_queue_get_fd(queue);
     658                        if (message_fd > largestfd)
     659                                largestfd = message_fd;
     660                }
     661                do {
     662                        /* Use select to allow us to time out occasionally to check if someone
     663                         * has hit Ctrl-C or otherwise wants us to stop reading and return
     664                         * so they can exit their program.
     665                         */
     666                        tout.tv_sec = 0;
     667                        tout.tv_usec = 500000;
     668                        /* Make sure we reset these each loop */
     669                        FD_ZERO(&readfds);
     670                        FD_SET(stream->fd, &readfds);
     671                        if (queue)
     672                                FD_SET(message_fd, &readfds);
     673
     674                        ret = select(largestfd+1, &readfds, NULL, NULL, &tout);
     675                        if (ret >= 1) {
     676                                /* A file descriptor triggered */
     677                                break;
     678                        } else if (ret < 0 && errno != EINTR) {
     679                                trace_set_err(libtrace, errno, "select");
     680                                return -1;
     681                        } else {
     682                                if (libtrace_halt)
     683                                        return READ_EOF;
    632684                        }
    633                         while (ret == -1);
    634                        
    635                         assert (ret == 1 || ret == 2); // No timeout 0 is not an option
    636                        
    637                         if (FD_ISSET(get_thread_table(libtrace)->messages.pipefd[0], &rfds)) {
    638                                 // Not an error but check the message queue we have something
    639                                 return -2;
    640                         }
    641                         // Otherwise we must have a packet
    642                         hdr->wirelen = recvmsg(fd, &msghdr, MSG_TRUNC);
    643                 }
    644         } else {
    645         /* Use select to allow us to time out occasionally to check if someone
    646          * has hit Ctrl-C or otherwise wants us to stop reading and return
    647          * so they can exit their program.
    648          */
    649 
    650         while (1) {
    651                 tout.tv_sec = 0;
    652                 tout.tv_usec = 500000;
    653                 FD_ZERO(&readfds);
    654                 FD_SET(FORMAT(libtrace->format_data)->fd, &readfds);
    655 
    656                 ret = select(FORMAT(libtrace->format_data)->fd + 1, &readfds,
    657                                 NULL, NULL, &tout);
    658                 if (ret < 0 && errno != EINTR) {
    659                         trace_set_err(libtrace, errno, "select");
    660                         return -1;
    661                 } else if (ret < 0) {
    662                         continue;
    663                 }
    664                
    665                 if (FD_ISSET(FORMAT(libtrace->format_data)->fd, &readfds)) {
    666                         /* There's something available for us to read */
    667                         break;
    668                 }
    669 
    670                
    671                 /* If we get here, we timed out -- check if we should halt */
    672                 if (libtrace_halt)
    673                         return 0;
    674         }
    675         hdr->wirelen = recvmsg(FORMAT(libtrace->format_data)->fd, &msghdr, MSG_TRUNC);
    676         }
    677 
    678        
     685                }
     686                while (ret <= 0);
     687
     688                /* Message waiting? */
     689                if (queue && FD_ISSET(message_fd, &readfds))
     690                        return READ_MESSAGE;
     691
     692                /* We must have a packet */
     693                hdr->wirelen = recvmsg(stream->fd, &msghdr, MSG_TRUNC);
     694        }
     695
    679696        if (hdr->wirelen==~0U) {
    680697                trace_set_err(libtrace,errno,"recvmsg");
     
    724741        if (cmsg == NULL) {
    725742                struct timeval tv;
    726                 if (ioctl(fd, SIOCGSTAMP,&tv)==0) {
     743                if (ioctl(stream->fd, SIOCGSTAMP,&tv)==0) {
    727744                        hdr->tv.tv_sec = tv.tv_sec;
    728745                        hdr->tv.tv_usec = tv.tv_usec;
     
    747764static int linuxnative_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet)
    748765{
    749         int fd = FORMAT(libtrace->format_data)->fd;
    750         return linuxnative_read_packet_fd(libtrace, packet, fd, 0);
     766        return linuxnative_read_stream(libtrace, packet, FORMAT_DATA_FIRST, NULL);
    751767}
    752768
     
    756772                                     UNUSED size_t nb_packets) {
    757773        /* For now just read one packet */
    758         int fd = PERPKT_FORMAT(t)->fd;
    759         packets[0]->error = linuxnative_read_packet_fd(libtrace, packets[0],
    760                                                        fd, 1);
     774        packets[0]->error = linuxnative_read_stream(libtrace, packets[0],
     775                                                       t->format_data, &t->messages);
    761776        if (packets[0]->error >= 1)
    762777                return 1;
     
    765780}
    766781
    767 static int linuxnative_write_packet(libtrace_out_t *trace,
     782static int linuxnative_write_packet(libtrace_out_t *libtrace,
    768783                libtrace_packet_t *packet)
    769784{
     
    776791        hdr.sll_family = AF_PACKET;
    777792        hdr.sll_protocol = 0;
    778         hdr.sll_ifindex = if_nametoindex(trace->uridata);
     793        hdr.sll_ifindex = if_nametoindex(libtrace->uridata);
    779794        hdr.sll_hatype = 0;
    780795        hdr.sll_pkttype = 0;
     
    784799        /* This is pretty easy, just send the payload using sendto() (after
    785800         * setting up the sll header properly, of course) */
    786         ret = sendto(DATAOUT(trace)->fd,
     801        ret = sendto(FORMAT_DATA_OUT->fd,
    787802                        packet->payload,
    788803                        trace_get_capture_length(packet),
     
    791806
    792807        if (ret < 0) {
    793                 trace_set_err_out(trace, errno, "sendto failed");
     808                trace_set_err_out(libtrace, errno, "sendto failed");
    794809        }
    795810
     
    896911}
    897912
    898 static int linuxnative_get_fd(const libtrace_t *trace) {
    899         if (trace->format_data == NULL)
     913static int linuxnative_get_fd(const libtrace_t *libtrace) {
     914        if (libtrace->format_data == NULL)
    900915                return -1;
    901         return FORMAT(trace->format_data)->fd;
     916        return FORMAT_DATA_FIRST->fd;
    902917}
    903918
     
    913928}
    914929
     930#ifdef HAVE_NETPACKET_PACKET_H
     931static void linuxnative_update_statistics(libtrace_t *libtrace) {
     932        struct tpacket_stats stats;
     933        size_t i;
     934        socklen_t len = sizeof(stats);
     935
     936        for (i = 0; i < libtrace_list_get_size(FORMAT_DATA->per_stream); ++i) {
     937                struct linux_per_stream_t *stream;
     938                stream = libtrace_list_get_index(FORMAT_DATA->per_stream, i)->data;
     939                if (stream->fd != -1) {
     940                        if (getsockopt(stream->fd,
     941                                   SOL_PACKET,
     942                                   PACKET_STATISTICS,
     943                                   &stats,
     944                                   &len) == 0) {
     945                                if (FORMAT_DATA->stats_valid==0) {
     946                                        FORMAT_DATA->stats.tp_drops = stats.tp_drops;
     947                                        FORMAT_DATA->stats.tp_packets = stats.tp_packets;
     948                                        FORMAT_DATA->stats_valid = 1;
     949                                } else {
     950                                        FORMAT_DATA->stats.tp_drops += stats.tp_drops;
     951                                        FORMAT_DATA->stats.tp_drops += stats.tp_packets;
     952                                }
     953                        } else {
     954                                perror("getsockopt PACKET_STATISTICS failed");
     955                        }
     956                }
     957        }
     958}
     959#endif
     960
    915961/* Number of packets that passed filtering */
    916 static uint64_t linuxnative_get_captured_packets(libtrace_t *trace) {
    917         struct tpacket_stats stats;
    918 
    919         if (trace->format_data == NULL)
     962static uint64_t linuxnative_get_captured_packets(libtrace_t *libtrace) {
     963        if (libtrace->format_data == NULL)
    920964                return UINT64_MAX;
    921         if (FORMAT(trace->format_data)->fd == -1) {
     965        if (FORMAT_DATA_FIRST->fd == -1) {
    922966                /* This is probably a 'dead' trace so obviously we can't query
    923967                 * the socket for capture counts, can we? */
     
    926970
    927971#ifdef HAVE_NETPACKET_PACKET_H
    928 
    929         if ((FORMAT(trace->format_data)->stats_valid & 1)
    930                 || FORMAT(trace->format_data)->stats_valid == 0) {
    931                 if (FORMAT(trace->format_data)->per_thread) {
    932                         int i;
    933                         FORMAT(trace->format_data)->stats.tp_drops = 0;
    934                         FORMAT(trace->format_data)->stats.tp_packets = 0;
    935                         for (i = 0; i < trace->perpkt_thread_count; ++i) {
    936                                 socklen_t len = sizeof(stats);
    937                                 getsockopt(FORMAT(trace->format_data)->per_thread[i].fd,
    938                                            SOL_PACKET,
    939                                            PACKET_STATISTICS,
    940                                            &stats,
    941                                            &len);
    942                                 FORMAT(trace->format_data)->stats.tp_drops += stats.tp_drops;
    943                                 FORMAT(trace->format_data)->stats.tp_packets += stats.tp_packets;
    944                         }
    945                         FORMAT(trace->format_data)->stats_valid |= 1;
    946                 } else {
    947                         socklen_t len = sizeof(FORMAT(trace->format_data)->stats);
    948                         getsockopt(FORMAT(trace->format_data)->fd,
    949                                    SOL_PACKET,
    950                                    PACKET_STATISTICS,
    951                                    &FORMAT(trace->format_data)->stats,
    952                                    &len);
    953                         FORMAT(trace->format_data)->stats_valid |= 1;
    954                 }
    955         }
    956 
    957         return FORMAT(trace->format_data)->stats.tp_packets;
     972        linuxnative_update_statistics(libtrace);
     973        if (FORMAT_DATA->stats_valid)
     974                return FORMAT_DATA->stats.tp_packets;
     975        else
     976                return UINT64_MAX;
    958977#else
    959978        return UINT64_MAX;
     
    961980}
    962981
     982
    963983/* Number of packets that got past filtering and were then dropped because
    964  * of lack of space
     984 * of lack of space.
     985 *
     986 * We could also try read from /sys/class/net/ethX/statistics/ to get
     987 * real drop counters and stuff.
    965988 */
    966 static uint64_t linuxnative_get_dropped_packets(libtrace_t *trace) {
    967         struct tpacket_stats stats;
    968         if (trace->format_data == NULL)
     989static uint64_t linuxnative_get_dropped_packets(libtrace_t *libtrace) {
     990        if (libtrace->format_data == NULL)
    969991                return UINT64_MAX;
    970         if (FORMAT(trace->format_data)->fd == -1) {
     992        if (FORMAT_DATA_FIRST->fd == -1) {
    971993                /* This is probably a 'dead' trace so obviously we can't query
    972994                 * the socket for drop counts, can we? */
     
    974996        }
    975997
    976 #ifdef HAVE_NETPACKET_PACKET_H 
    977         if ((FORMAT(trace->format_data)->stats_valid & 2)
    978                 || (FORMAT(trace->format_data)->stats_valid==0)) {
    979                 if (FORMAT(trace->format_data)->per_thread) {
    980                         int i;
    981                         FORMAT(trace->format_data)->stats.tp_drops = 0;
    982                         FORMAT(trace->format_data)->stats.tp_packets = 0;
    983                         for (i = 0; i < trace->perpkt_thread_count; ++i) {
    984                                 socklen_t len = sizeof(stats);
    985                                 getsockopt(FORMAT(trace->format_data)->per_thread[i].fd,
    986                                            SOL_PACKET,
    987                                            PACKET_STATISTICS,
    988                                            &stats,
    989                                            &len);
    990                                 FORMAT(trace->format_data)->stats.tp_drops += stats.tp_drops;
    991                                 FORMAT(trace->format_data)->stats.tp_packets += stats.tp_packets;
    992                         }
    993                         FORMAT(trace->format_data)->stats_valid |= 2;
    994                 } else {
    995                         socklen_t len = sizeof(FORMAT(trace->format_data)->stats);
    996                         getsockopt(FORMAT(trace->format_data)->fd,
    997                                    SOL_PACKET,
    998                                    PACKET_STATISTICS,
    999                                    &FORMAT(trace->format_data)->stats,
    1000                                    &len);
    1001                         FORMAT(trace->format_data)->stats_valid |= 2;
    1002                 }
    1003         }
    1004 
    1005         return FORMAT(trace->format_data)->stats.tp_drops;
     998#ifdef HAVE_NETPACKET_PACKET_H
     999        linuxnative_update_statistics(libtrace);
     1000        if (FORMAT_DATA->stats_valid)
     1001                return FORMAT_DATA->stats.tp_drops;
     1002        else
     1003                return UINT64_MAX;
    10061004#else
    10071005        return UINT64_MAX;
     
    10651063        linuxnative_pstart_input,                       /* pstart_input */
    10661064        linuxnative_pread_packets,                      /* pread_packets */
    1067         linuxnative_ppause_input,                       /* ppause */
     1065        linuxnative_pause_input,                        /* ppause */
    10681066        linuxnative_fin_input,                          /* p_fin */
    10691067        linuxnative_pconfig_input,                      /* pconfig input */
  • lib/format_linux.h

    r1871afc r771ab22  
    244244} ALIGN_STRUCT(CACHE_LINE_SIZE);
    245245
     246#define ZERO_LINUX_STREAM {-1, NULL, 0}
     247
     248
    246249/* Format header for encapsulating packets captured using linux native */
    247250struct libtrace_linuxnative_header {
Note: See TracChangeset for help on using the changeset viewer.