Changeset 639d952


Ignore:
Timestamp:
11/13/17 17:24:27 (3 years ago)
Author:
Shane Alcock <salcock@…>
Branches:
cachetimestamps, develop, dpdk-ndag, etsilive, master, ndag_format, rc-4.0.2, rc-4.0.3, rc-4.0.4, ringdecrementfix, ringperformance
Children:
5d8280a
Parents:
b34f924
Message:

Minor performance tweaks for format_ndag

Most notably, split receive_encap_records() into two separate
versions (blocking and nonblocking) which should avoid a few
"if (block)" checks.

Also removed extraneous call to receiver_read_messages when using
the pread function.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • lib/format_ndag.c

    rb34f924 r639d952  
    736736                return 0;
    737737        }
     738        /*
    738739        if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
    739740                        ssock->savedsize[ssock->nextreadind]) {
    740741                return 0;
    741742        }
     743        */
    742744        return 1;
    743745
     
    862864        int avail, ret, ndagstat, i;
    863865        int toret = 0;
    864         struct timespec ts;
    865866
    866867        if (ssock->sock == -1) {
     
    876877        }
    877878
    878         ts.tv_sec = 0;
    879         ts.tv_nsec = 10000;
    880 
    881879        ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
    882                         MSG_DONTWAIT, &ts);
    883 
    884         /*
    885            ret = recvfrom(rt->sources[i].sock, rt->sources[i].saved[nw],
    886            ENCAP_BUFSIZE, MSG_DONTWAIT,
    887            rt->sources[i].srcaddr->ai_addr,
    888            &(rt->sources[i].srcaddr->ai_addrlen));
    889          */
     880                        MSG_DONTWAIT, NULL);
     881
    890882        if (ret < 0) {
    891883                /* Nothing to receive right now, but we should still
     
    955947
    956948
    957 static int receive_encap_records(libtrace_t *libtrace, recvstream_t *rt,
    958                 libtrace_packet_t *packet, int block) {
     949static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
     950                libtrace_packet_t *packet) {
    959951
    960952        int iserr = 0;
     
    972964
    973965                /* Check for any messages from the control thread */
    974                 if (block) {
    975                         iserr = receiver_read_messages(rt);
    976 
    977                         if (iserr <= 0) {
    978                                 return iserr;
    979                         }
    980                 }
    981 
    982                 /* If non-blocking and there are no sources, just break */
    983                 if (!block && rt->sourcecount == 0) {
    984                         iserr = 0;
    985                         break;
     966                iserr = receiver_read_messages(rt);
     967
     968                if (iserr <= 0) {
     969                        return iserr;
    986970                }
    987971
     
    989973                 * checking for messages again.
    990974                 */
    991                 if (block && rt->sourcecount == 0) {
     975                if (rt->sourcecount == 0) {
    992976                        usleep(10000);
    993977                        continue;
     
    1005989                 * a short break rather than immediately trying again.
    1006990                 */
    1007                 if (block && iserr == 0) {
     991                if (iserr == 0) {
    1008992                        usleep(100);
    1009993                }
    1010994
    1011         } while (block);
     995        } while (1);
    1012996
    1013997        return iserr;
     998}
     999
     1000static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
     1001                libtrace_packet_t *packet) {
     1002
     1003        int iserr = 0;
     1004
     1005        if (packet->buf_control == TRACE_CTRL_PACKET) {
     1006                free(packet->buffer);
     1007                packet->buffer = NULL;
     1008        }
     1009
     1010        /* Make sure we shouldn't be halting */
     1011        if ((iserr = is_halted(libtrace)) != -1) {
     1012                return iserr;
     1013        }
     1014
     1015        /* If non-blocking and there are no sources, just break */
     1016        if (rt->sourcecount == 0) {
     1017                return 0;
     1018        }
     1019
     1020        return receive_from_sockets(rt);
    10141021}
    10151022
     
    10471054        int rem;
    10481055        streamsock_t *nextavail = NULL;
    1049         rem = receive_encap_records(libtrace, &(FORMAT_DATA->receivers[0]),
    1050                         packet, 1);
     1056        rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
     1057                        packet);
    10511058
    10521059        if (rem <= 0) {
     
    10771084        rt = (recvstream_t *)t->format_data;
    10781085
    1079         /* Only check for messages once per batch */
    1080         rem = receiver_read_messages(rt);
    1081         if (rem <= 0) {
    1082                 return rem;
    1083         }
    10841086
    10851087        do {
    1086                 rem = receive_encap_records(libtrace, rt,
    1087                                 packets[read_packets],
    1088                                 read_packets == 0 ? 1 : 0);
     1088                /* Only check for messages once per batch */
     1089                if (read_packets == 0) {
     1090                        rem = receive_encap_records_block(libtrace, rt,
     1091                                packets[read_packets]);
     1092                } else {
     1093                        rem = receive_encap_records_nonblock(libtrace, rt,
     1094                                packets[read_packets]);
     1095                }
     1096
    10891097                if (rem < 0) {
    10901098                        return rem;
     
    11221130        streamsock_t *nextavail = NULL;
    11231131
    1124         /* Only check for messages once per batch */
     1132        /* Only check for messages once per call */
    11251133        rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
    11261134        if (rem <= 0) {
     
    11301138
    11311139        do {
    1132                 rem = receive_encap_records(libtrace,
    1133                                 &(FORMAT_DATA->receivers[0]), packet, 0);
     1140                rem = receive_encap_records_nonblock(libtrace,
     1141                                &(FORMAT_DATA->receivers[0]), packet);
    11341142
    11351143                if (rem < 0) {
Note: See TracChangeset for help on using the changeset viewer.