/*
*
* Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
* All rights reserved.
*
* This file is part of libtrace.
*
* This code has been developed by the University of Waikato WAND
* research group. For further information please see http://www.wand.net.nz/
*
* libtrace is free software; you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* libtrace is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program. If not, see .
*
*
*/
#define _GNU_SOURCE
#include "config.h"
#include "common.h"
#include "libtrace.h"
#include "libtrace_int.h"
#include "format_helper.h"
#include "format_erf.h"
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "format_ndag.h"
#define NDAG_IDLE_TIMEOUT (600)
#define ENCAP_BUFSIZE (10000)
#define CTRL_BUF_SIZE (10000)
#define ENCAP_BUFFERS (1000)
#define RECV_BATCH_SIZE (50)
#define FORMAT_DATA ((ndag_format_data_t *)libtrace->format_data)
static struct libtrace_format_t ndag;
volatile int ndag_paused = 0;
typedef struct monitor {
uint16_t monitorid;
uint64_t laststart;
} ndag_monitor_t;
typedef struct streamsource {
uint16_t monitor;
char *groupaddr;
char *localiface;
uint16_t port;
} streamsource_t;
typedef struct streamsock {
char *groupaddr;
int sock;
struct addrinfo *srcaddr;
uint16_t port;
uint32_t expectedseq;
ndag_monitor_t *monitorptr;
char **saved;
char *nextread;
int nextreadind;
int nextwriteind;
int savedsize[ENCAP_BUFFERS];
uint64_t nextts;
uint32_t startidle;
uint64_t recordcount;
int bufavail;
int bufwaiting;
#if HAVE_DECL_RECVMMSG
struct mmsghdr mmsgbufs[RECV_BATCH_SIZE];
#else
struct msghdr singlemsg;
#endif
} streamsock_t;
typedef struct recvstream {
streamsock_t *sources;
uint16_t sourcecount;
libtrace_message_queue_t mqueue;
int threadindex;
ndag_monitor_t *knownmonitors;
uint16_t monitorcount;
uint64_t dropped_upstream;
uint64_t missing_records;
uint64_t received_packets;
int maxfd;
} recvstream_t;
typedef struct ndag_format_data {
char *multicastgroup;
char *portstr;
char *localiface;
uint16_t nextthreadid;
recvstream_t *receivers;
pthread_t controlthread;
libtrace_message_queue_t controlqueue;
int consterfframing;
} ndag_format_data_t;
enum {
NDAG_CLIENT_HALT = 0x01,
NDAG_CLIENT_RESTARTED = 0x02, // redundant
NDAG_CLIENT_NEWGROUP = 0x03
};
typedef struct ndagreadermessage {
uint8_t type;
streamsource_t contents;
} ndag_internal_message_t;
static inline int seq_cmp(uint32_t seq_a, uint32_t seq_b) {
/* Calculate seq_a - seq_b, taking wraparound into account */
if (seq_a == seq_b) return 0;
if (seq_a > seq_b) {
return (int) (seq_a - seq_b);
}
/* -1 for the wrap and another -1 because we don't use zero */
return (int) (0xffffffff - ((seq_b - seq_a) - 2));
}
static uint8_t check_ndag_header(char *msgbuf, uint32_t msgsize) {
ndag_common_t *header = (ndag_common_t *)msgbuf;
if (msgsize < sizeof(ndag_common_t)) {
fprintf(stderr,
"nDAG message does not have a complete nDAG header.\n");
return 0;
}
if (ntohl(header->magic) != NDAG_MAGIC_NUMBER) {
fprintf(stderr,
"nDAG message does not have a valid magic number.\n");
return 0;
}
if (header->version > NDAG_EXPORT_VERSION || header->version == 0) {
fprintf(stderr,
"nDAG message has an invalid header version: %u\n",
header->version);
return 0;
}
return header->type;
}
static int join_multicast_group(char *groupaddr, char *localiface,
char *portstr, uint16_t portnum, struct addrinfo **srcinfo) {
struct addrinfo hints;
struct addrinfo *gotten;
struct addrinfo *group;
unsigned int interface;
char pstr[16];
struct group_req greq;
int bufsize, val;
int sock;
if (portstr == NULL) {
snprintf(pstr, 15, "%u", portnum);
portstr = pstr;
}
interface = if_nametoindex(localiface);
if (interface == 0) {
fprintf(stderr, "Failed to lookup interface %s -- %s\n",
localiface, strerror(errno));
return -1;
}
hints.ai_family = PF_UNSPEC;
hints.ai_socktype = SOCK_DGRAM;
hints.ai_flags = AI_PASSIVE;
hints.ai_protocol = 0;
if (getaddrinfo(NULL, portstr, &hints, &gotten) != 0) {
fprintf(stderr,
"Call to getaddrinfo failed for NULL:%s -- %s\n",
portstr, strerror(errno));
return -1;
}
if (getaddrinfo(groupaddr, NULL, &hints, &group) != 0) {
fprintf(stderr, "Call to getaddrinfo failed for %s -- %s\n",
groupaddr, strerror(errno));
return -1;
}
*srcinfo = gotten;
sock = socket(gotten->ai_family, gotten->ai_socktype, 0);
if (sock < 0) {
fprintf(stderr,
"Failed to create multicast socket for %s:%s -- %s\n",
groupaddr, portstr, strerror(errno));
goto sockcreateover;
}
val = 1;
if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) {
fprintf(stderr,
"Failed to set REUSEADDR socket option for %s:%s -- %s\n",
groupaddr, portstr, strerror(errno));
goto sockcreateover;
}
if (bind(sock, (struct sockaddr *)gotten->ai_addr, gotten->ai_addrlen) < 0)
{
fprintf(stderr,
"Failed to bind to multicast socket %s:%s -- %s\n",
groupaddr, portstr, strerror(errno));
sock = -1;
goto sockcreateover;
}
greq.gr_interface = interface;
memcpy(&(greq.gr_group), group->ai_addr, group->ai_addrlen);
if (setsockopt(sock, IPPROTO_IP, MCAST_JOIN_GROUP, &greq,
sizeof(greq)) < 0) {
fprintf(stderr,
"Failed to join multicast group %s:%s -- %s\n",
groupaddr, portstr, strerror(errno));
close(sock);
sock = -1;
goto sockcreateover;
}
bufsize = 16 * 1024 * 1024;
if (setsockopt(sock, SOL_SOCKET, SO_RCVBUF, &bufsize,
(socklen_t)sizeof(int)) < 0) {
fprintf(stderr,
"Failed to increase buffer size for multicast group %s:%s -- %s\n",
groupaddr, portstr, strerror(errno));
close(sock);
sock = -1;
goto sockcreateover;
}
sockcreateover:
freeaddrinfo(group);
return sock;
}
static int ndag_init_input(libtrace_t *libtrace) {
char *scan = NULL;
char *next = NULL;
libtrace->format_data = (ndag_format_data_t *)malloc(
sizeof(ndag_format_data_t));
if (!libtrace->format_data) {
trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Unable to allocate memory for "
"format data inside ndag_init_input()");
return -1;
}
FORMAT_DATA->multicastgroup = NULL;
FORMAT_DATA->portstr = NULL;
FORMAT_DATA->localiface = NULL;
FORMAT_DATA->nextthreadid = 0;
FORMAT_DATA->receivers = NULL;
FORMAT_DATA->consterfframing = -1;
scan = strchr(libtrace->uridata, ',');
if (scan == NULL) {
trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
"Bad ndag URI. Should be ndag:,,");
return -1;
}
FORMAT_DATA->localiface = strndup(libtrace->uridata,
(size_t)(scan - libtrace->uridata));
next = scan + 1;
scan = strchr(next, ',');
if (scan == NULL) {
FORMAT_DATA->portstr = strdup("9001");
FORMAT_DATA->multicastgroup = strdup(next);
} else {
FORMAT_DATA->multicastgroup = strndup(next, (size_t)(scan - next));
FORMAT_DATA->portstr = strdup(scan + 1);
}
return 0;
}
static int ndag_config_input(libtrace_t *libtrace, trace_option_t option,
void *value) {
switch(option) {
case TRACE_OPTION_CONSTANT_ERF_FRAMING:
FORMAT_DATA->consterfframing = *(int *)value;
break;
case TRACE_OPTION_EVENT_REALTIME:
case TRACE_OPTION_SNAPLEN:
case TRACE_OPTION_PROMISC:
case TRACE_OPTION_FILTER:
case TRACE_OPTION_META_FREQ:
default:
trace_set_err(libtrace, TRACE_ERR_OPTION_UNAVAIL,
"Unsupported option");
return -1;
}
return 0;
}
static void new_group_alert(libtrace_t *libtrace, uint16_t threadid,
uint16_t portnum, uint16_t monid) {
ndag_internal_message_t alert;
alert.type = NDAG_CLIENT_NEWGROUP;
alert.contents.groupaddr = FORMAT_DATA->multicastgroup;
alert.contents.localiface = FORMAT_DATA->localiface;
alert.contents.port = portnum;
alert.contents.monitor = monid;
libtrace_message_queue_put(&(FORMAT_DATA->receivers[threadid].mqueue),
(void *)&alert);
}
static int ndag_parse_control_message(libtrace_t *libtrace, char *msgbuf,
int msgsize, uint16_t *ptmap) {
int i;
ndag_common_t *ndaghdr = (ndag_common_t *)msgbuf;
uint8_t msgtype;
msgtype = check_ndag_header(msgbuf, (uint32_t)msgsize);
if (msgtype == 0) {
return -1;
}
msgsize -= sizeof(ndag_common_t);
if (msgtype == NDAG_PKT_BEACON) {
/* If message is a beacon, make sure every port included in the
* beacon is assigned to a receive thread.
*/
uint16_t *ptr, numstreams;
if ((uint32_t)msgsize < sizeof(uint16_t)) {
fprintf(stderr, "Malformed beacon (missing number of streams).\n");
return -1;
}
ptr = (uint16_t *)(msgbuf + sizeof(ndag_common_t));
numstreams = ntohs(*ptr);
ptr ++;
if ((uint32_t)msgsize != ((numstreams + 1) * sizeof(uint16_t)))
{
fprintf(stderr, "Malformed beacon (length doesn't match number of streams).\n");
fprintf(stderr, "%u %u\n", msgsize, numstreams);
return -1;
}
for (i = 0; i < numstreams; i++) {
uint16_t streamport = ntohs(*ptr);
if (ptmap[streamport] == 0xffff) {
new_group_alert(libtrace,
FORMAT_DATA->nextthreadid, streamport,
ntohs(ndaghdr->monitorid));
ptmap[streamport] = FORMAT_DATA->nextthreadid;
if (libtrace->perpkt_thread_count == 0) {
FORMAT_DATA->nextthreadid = 0;
} else {
FORMAT_DATA->nextthreadid =
((FORMAT_DATA->nextthreadid + 1) % libtrace->perpkt_thread_count);
}
}
ptr ++;
}
} else {
fprintf(stderr,
"Unexpected message type on control channel: %u\n",
msgtype);
return -1;
}
return 0;
}
static void *ndag_controller_run(void *tdata) {
libtrace_t *libtrace = (libtrace_t *)tdata;
uint16_t ptmap[65536];
int sock = -1;
struct addrinfo *receiveaddr = NULL;
fd_set listening;
struct timeval timeout;
/* ptmap is a dirty hack to allow us to quickly check if we've already
* assigned a stream to a thread.
*/
memset(ptmap, 0xff, 65536 * sizeof(uint16_t));
sock = join_multicast_group(FORMAT_DATA->multicastgroup,
FORMAT_DATA->localiface, FORMAT_DATA->portstr, 0,
&receiveaddr);
if (sock == -1) {
trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
"Unable to join multicast group for nDAG control channel");
trace_interrupt();
pthread_exit(NULL);
}
ndag_paused = 0;
while ((is_halted(libtrace) == -1) && !ndag_paused) {
int ret;
char buf[CTRL_BUF_SIZE];
FD_ZERO(&listening);
FD_SET(sock, &listening);
timeout.tv_sec = 0;
timeout.tv_usec = 500000;
ret = select(sock + 1, &listening, NULL, NULL, &timeout);
if (ret < 0) {
fprintf(stderr, "Error while waiting for nDAG control messages: %s\n", strerror(errno));
break;
}
if (!FD_ISSET(sock, &listening)) {
continue;
}
ret = recvfrom(sock, buf, CTRL_BUF_SIZE, 0,
receiveaddr->ai_addr,
&(receiveaddr->ai_addrlen));
if (ret < 0) {
fprintf(stderr, "Error while receiving nDAG control message: %s\n", strerror(errno));
break;
}
if (ret == 0) {
break;
}
if (ndag_parse_control_message(libtrace, buf, ret, ptmap) < 0) {
fprintf(stderr, "Error while parsing nDAG control message.\n");
continue;
}
}
if (sock >= 0) {
close(sock);
}
/* Control channel has fallen over, should probably encourage libtrace
* to halt the receiver threads as well.
*/
if (!is_halted(libtrace)) {
trace_interrupt();
}
pthread_exit(NULL);
}
static int ndag_start_threads(libtrace_t *libtrace, uint32_t maxthreads)
{
int ret;
uint32_t i;
/* Configure the set of receiver threads */
if (FORMAT_DATA->receivers == NULL) {
/* What if the number of threads changes between a pause and
* a restart? Can this happen? */
FORMAT_DATA->receivers = (recvstream_t *)
malloc(sizeof(recvstream_t) * maxthreads);
}
for (i = 0; i < maxthreads; i++) {
FORMAT_DATA->receivers[i].sources = NULL;
FORMAT_DATA->receivers[i].sourcecount = 0;
FORMAT_DATA->receivers[i].knownmonitors = NULL;
FORMAT_DATA->receivers[i].monitorcount = 0;
FORMAT_DATA->receivers[i].threadindex = i;
FORMAT_DATA->receivers[i].dropped_upstream = 0;
FORMAT_DATA->receivers[i].received_packets = 0;
FORMAT_DATA->receivers[i].missing_records = 0;
FORMAT_DATA->receivers[i].maxfd = -1;
libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
sizeof(ndag_internal_message_t));
}
/* Start the controller thread */
/* TODO consider affinity of this thread? */
ret = pthread_create(&(FORMAT_DATA->controlthread), NULL,
ndag_controller_run, libtrace);
if (ret != 0) {
return -1;
}
return maxthreads;
}
static int ndag_start_input(libtrace_t *libtrace) {
return ndag_start_threads(libtrace, 1);
}
static int ndag_pstart_input(libtrace_t *libtrace) {
if (ndag_start_threads(libtrace, libtrace->perpkt_thread_count) ==
libtrace->perpkt_thread_count)
return 0;
return -1;
}
static void halt_ndag_receiver(recvstream_t *receiver) {
int j, i;
libtrace_message_queue_destroy(&(receiver->mqueue));
if (receiver->sources == NULL)
return;
for (i = 0; i < receiver->sourcecount; i++) {
streamsock_t src = receiver->sources[i];
if (src.saved) {
for (j = 0; j < ENCAP_BUFFERS; j++) {
if (src.saved[j]) {
free(src.saved[j]);
}
}
free(src.saved);
}
#if HAVE_DECL_RECVMMSG
for (j = 0; j < RECV_BATCH_SIZE; j++) {
if (src.mmsgbufs[j].msg_hdr.msg_iov) {
free(src.mmsgbufs[j].msg_hdr.msg_iov);
}
}
#else
free(src.singlemsg.msg_iov);
#endif
if (src.sock != -1) {
close(src.sock);
}
}
if (receiver->knownmonitors) {
free(receiver->knownmonitors);
}
if (receiver->sources) {
free(receiver->sources);
}
}
static int ndag_pause_input(libtrace_t *libtrace) {
int i;
/* Close the existing receiver sockets */
for (i = 0; i < libtrace->perpkt_thread_count; i++) {
halt_ndag_receiver(&(FORMAT_DATA->receivers[i]));
}
return 0;
}
static int ndag_fin_input(libtrace_t *libtrace) {
if (FORMAT_DATA->receivers) {
free(FORMAT_DATA->receivers);
}
if (FORMAT_DATA->multicastgroup) {
free(FORMAT_DATA->multicastgroup);
}
if (FORMAT_DATA->portstr) {
free(FORMAT_DATA->portstr);
}
if (FORMAT_DATA->localiface) {
free(FORMAT_DATA->localiface);
}
free(libtrace->format_data);
return 0;
}
static int ndag_get_framing_length(const libtrace_packet_t *packet) {
libtrace_t *libtrace = packet->trace;
if (FORMAT_DATA->consterfframing >= 0) {
return FORMAT_DATA->consterfframing;
}
return erf_get_framing_length(packet);
}
static int ndag_prepare_packet_stream(libtrace_t *restrict libtrace,
recvstream_t *restrict rt,
streamsock_t *restrict ssock,
libtrace_packet_t *restrict packet,
uint32_t flags UNUSED) {
/* XXX flags is constant, so we can tell the compiler to not
* bother copying over the parameter
*/
dag_record_t *erfptr;
ndag_encap_t *encaphdr;
uint16_t ndag_reccount = 0;
int nr;
uint16_t rlen;
/*
if ((flags & TRACE_PREP_OWN_BUFFER) == TRACE_PREP_OWN_BUFFER) {
packet->buf_control = TRACE_CTRL_PACKET;
} else {
packet->buf_control = TRACE_CTRL_EXTERNAL;
}
*/
packet->buf_control = TRACE_CTRL_EXTERNAL;
packet->trace = libtrace;
packet->buffer = ssock->nextread;
packet->header = ssock->nextread;
packet->type = TRACE_RT_DATA_ERF;
erfptr = (dag_record_t *)packet->header;
if (erfptr->flags.rxerror == 1) {
packet->payload = NULL;
if (FORMAT_DATA->consterfframing >= 0) {
erfptr->rlen = htons(FORMAT_DATA->consterfframing & 0xffff);
} else {
erfptr->rlen = htons(erf_get_framing_length(packet));
}
} else {
if (FORMAT_DATA->consterfframing >= 0) {
packet->payload = (char *)packet->buffer +
FORMAT_DATA->consterfframing;
} else {
packet->payload = (char *)packet->buffer +
erf_get_framing_length(packet);
}
}
/* Update upstream drops using lctr */
if (erfptr->type == TYPE_DSM_COLOR_ETH) {
/* TODO */
} else {
if (rt->received_packets > 0) {
rt->dropped_upstream += ntohs(erfptr->lctr);
}
}
rt->received_packets ++;
ssock->recordcount += 1;
nr = ssock->nextreadind;
encaphdr = (ndag_encap_t *)(ssock->saved[nr] +
sizeof(ndag_common_t));
ndag_reccount = ntohs(encaphdr->recordcount);
if ((ndag_reccount & 0x8000) != 0) {
/* Record was truncated -- update rlen appropriately */
rlen = ssock->savedsize[nr] -
(ssock->nextread - ssock->saved[nr]);
erfptr->rlen = htons(rlen);
} else {
rlen = ntohs(erfptr->rlen);
}
ssock->nextread += rlen;
ssock->nextts = 0;
if (ssock->nextread - ssock->saved[nr] > ssock->savedsize[nr]) {
trace_set_err(libtrace, TRACE_ERR_INIT_FAILED, "Walked past the end of the "
"nDAG receive buffer, probably due to a invalid rlen, in ndag_prepare_packet_stream()");
return -1;
}
if (ssock->nextread - ssock->saved[nr] >= ssock->savedsize[nr]) {
/* Read everything from this buffer, mark as empty and
* move on. */
ssock->savedsize[nr] = 0;
ssock->bufwaiting ++;
nr ++;
if (nr == ENCAP_BUFFERS) {
nr = 0;
}
ssock->nextread = ssock->saved[nr] + sizeof(ndag_common_t) +
sizeof(ndag_encap_t);
ssock->nextreadind = nr;
}
packet->order = erf_get_erf_timestamp(packet);
packet->error = rlen;
return rlen;
}
static int ndag_prepare_packet(libtrace_t *libtrace UNUSED,
libtrace_packet_t *packet UNUSED,
void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
uint32_t flags UNUSED) {
fprintf(stderr, "Sending nDAG records over RT doesn't make sense! Please stop\n");
return 0;
}
static ndag_monitor_t *add_new_knownmonitor(recvstream_t *rt, uint16_t monid) {
ndag_monitor_t *mon;
if (rt->monitorcount == 0) {
rt->knownmonitors = (ndag_monitor_t *)
malloc(sizeof(ndag_monitor_t) * 5);
} else {
rt->knownmonitors = (ndag_monitor_t *)
realloc(rt->knownmonitors,
sizeof(ndag_monitor_t) * (rt->monitorcount * 5));
}
mon = &(rt->knownmonitors[rt->monitorcount]);
mon->monitorid = monid;
mon->laststart = 0;
rt->monitorcount ++;
return mon;
}
static int add_new_streamsock(recvstream_t *rt, streamsource_t src) {
streamsock_t *ssock = NULL;
ndag_monitor_t *mon = NULL;
int i;
/* TODO consider replacing this with a list or vector so we can
* easily remove sources that are no longer in use, rather than
* just setting the sock to -1 and having to check them every
* time we want to read a packet.
*/
if (rt->sourcecount == 0) {
rt->sources = (streamsock_t *)malloc(sizeof(streamsock_t) * 10);
} else if ((rt->sourcecount % 10) == 0) {
rt->sources = (streamsock_t *)realloc(rt->sources,
sizeof(streamsock_t) * (rt->sourcecount + 10));
}
ssock = &(rt->sources[rt->sourcecount]);
for (i = 0; i < rt->monitorcount; i++) {
if (rt->knownmonitors[i].monitorid == src.monitor) {
mon = &(rt->knownmonitors[i]);
break;
}
}
if (mon == NULL) {
mon = add_new_knownmonitor(rt, src.monitor);
}
ssock->port = src.port;
ssock->groupaddr = src.groupaddr;
ssock->expectedseq = 0;
ssock->monitorptr = mon;
ssock->saved = (char **)malloc(sizeof(char *) * ENCAP_BUFFERS);
ssock->bufavail = ENCAP_BUFFERS;
ssock->bufwaiting = 0;
ssock->startidle = 0;
ssock->nextts = 0;
for (i = 0; i < ENCAP_BUFFERS; i++) {
ssock->saved[i] = (char *)malloc(ENCAP_BUFSIZE);
ssock->savedsize[i] = 0;
}
ssock->sock = join_multicast_group(src.groupaddr, src.localiface,
NULL, src.port, &(ssock->srcaddr));
if (ssock->sock < 0) {
return -1;
}
if (ssock->sock > rt->maxfd) {
rt->maxfd = ssock->sock;
}
#if HAVE_DECL_RECVMMSG
for (i = 0; i < RECV_BATCH_SIZE; i++) {
ssock->mmsgbufs[i].msg_hdr.msg_iov = (struct iovec *)
malloc(sizeof(struct iovec));
ssock->mmsgbufs[i].msg_hdr.msg_name = ssock->srcaddr->ai_addr;
ssock->mmsgbufs[i].msg_hdr.msg_namelen = ssock->srcaddr->ai_addrlen;
ssock->mmsgbufs[i].msg_hdr.msg_control = NULL;
ssock->mmsgbufs[i].msg_hdr.msg_controllen = 0;
ssock->mmsgbufs[i].msg_hdr.msg_flags = 0;
ssock->mmsgbufs[i].msg_len = 0;
}
#else
ssock->singlemsg.msg_iov = (struct iovec *) calloc(1, sizeof(struct iovec));
#endif
ssock->nextread = NULL;;
ssock->nextreadind = 0;
ssock->nextwriteind = 0;
ssock->recordcount = 0;
rt->sourcecount += 1;
fprintf(stderr, "Added new stream %s:%u to thread %d\n",
ssock->groupaddr, ssock->port, rt->threadindex);
return ssock->port;
}
static int receiver_read_messages(recvstream_t *rt) {
ndag_internal_message_t msg;
while (libtrace_message_queue_try_get(&(rt->mqueue),
(void *)&msg) != LIBTRACE_MQ_FAILED) {
switch(msg.type) {
case NDAG_CLIENT_NEWGROUP:
if (add_new_streamsock(rt, msg.contents) < 0) {
return -1;
}
break;
case NDAG_CLIENT_HALT:
return 0;
}
}
return 1;
}
static inline int readable_data(streamsock_t *ssock) {
if (ssock->sock == -1) {
return 0;
}
if (ssock->savedsize[ssock->nextreadind] == 0) {
return 0;
}
/*
if (ssock->nextread - ssock->saved[ssock->nextreadind] >=
ssock->savedsize[ssock->nextreadind]) {
return 0;
}
*/
return 1;
}
static inline void reset_expected_seqs(recvstream_t *rt, ndag_monitor_t *mon) {
int i;
for (i = 0; i < rt->sourcecount; i++) {
if (rt->sources[i].monitorptr == mon) {
rt->sources[i].expectedseq = 0;
}
}
}
static int init_receivers(streamsock_t *ssock, int required) {
int wind = ssock->nextwriteind;
int i = 1;
#if HAVE_DECL_RECVMMSG
for (i = 0; i < required; i++) {
if (i >= RECV_BATCH_SIZE) {
break;
}
if (wind >= ENCAP_BUFFERS) {
wind = 0;
}
ssock->mmsgbufs[i].msg_len = 0;
ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_base = ssock->saved[wind];
ssock->mmsgbufs[i].msg_hdr.msg_iov->iov_len = ENCAP_BUFSIZE;
ssock->mmsgbufs[i].msg_hdr.msg_iovlen = 1;
wind ++;
}
#else
if (required <= 0) {
fprintf(stderr, "You are required to have atleast 1 receiver in init_receivers\n");
return TRACE_ERR_INIT_FAILED;
}
ssock->singlemsg.msg_iov->iov_base = ssock->saved[wind];
ssock->singlemsg.msg_iov->iov_len = ENCAP_BUFSIZE;
ssock->singlemsg.msg_iovlen = 1;
#endif
return i;
}
static int check_ndag_received(streamsock_t *ssock, int index,
unsigned int msglen, recvstream_t *rt) {
ndag_encap_t *encaphdr;
ndag_monitor_t *mon;
uint8_t rectype;
/* Check that we have a valid nDAG encap record */
rectype = check_ndag_header(ssock->saved[index], (uint32_t)msglen);
if (rectype == NDAG_PKT_KEEPALIVE) {
/* Keep-alive, reset startidle and carry on. Don't
* change nextwrite -- we want to overwrite the
* keep-alive with usable content. */
return 0;
} else if (rectype != NDAG_PKT_ENCAPERF) {
fprintf(stderr, "Received invalid record on the channel for %s:%u.\n",
ssock->groupaddr, ssock->port);
close(ssock->sock);
ssock->sock = -1;
return -1;
}
ssock->savedsize[index] = msglen;
ssock->nextwriteind ++;
ssock->bufavail --;
if (ssock->bufavail < 0) {
fprintf(stderr, "No space in buffer in check_ndag_received()\n");
return -1;
}
if (ssock->nextwriteind >= ENCAP_BUFFERS) {
ssock->nextwriteind = 0;
}
/* Get the useful info from the encap header */
encaphdr=(ndag_encap_t *)(ssock->saved[index] + sizeof(ndag_common_t));
mon = ssock->monitorptr;
if (mon->laststart == 0) {
mon->laststart = bswap_be_to_host64(encaphdr->started);
} else if (mon->laststart != bswap_be_to_host64(encaphdr->started)) {
mon->laststart = bswap_be_to_host64(encaphdr->started);
reset_expected_seqs(rt, mon);
/* TODO what is a good way to indicate this to clients?
* set the loss counter in the ERF header? a bit rude?
* use another bit in the ERF header?
* add a queryable flag to libtrace_packet_t?
*/
}
if (ssock->expectedseq != 0) {
rt->missing_records += seq_cmp(
ntohl(encaphdr->seqno), ssock->expectedseq);
}
ssock->expectedseq = ntohl(encaphdr->seqno) + 1;
if (ssock->expectedseq == 0) {
ssock->expectedseq ++;
}
if (ssock->nextread == NULL) {
/* If this is our first read, set up 'nextread'
* by skipping past the nDAG headers */
ssock->nextread = ssock->saved[0] +
sizeof(ndag_common_t) + sizeof(ndag_encap_t);
}
return 1;
}
static int receive_from_single_socket(streamsock_t *ssock, struct timeval *tv,
int *gottime, recvstream_t *rt) {
int ret, ndagstat, avail;
int toret = 0;
#if HAVE_DECL_RECVMMSG
int i;
#endif
avail = init_receivers(ssock, ssock->bufavail);
#if HAVE_DECL_RECVMMSG
ret = recvmmsg(ssock->sock, ssock->mmsgbufs, avail,
MSG_DONTWAIT, NULL);
#else
if (avail != 1) {
return 0;
}
ret = recvmsg(ssock->sock, &(ssock->singlemsg), MSG_DONTWAIT);
#endif
if (ret < 0) {
/* Nothing to receive right now, but we should still
* count as 'ready' if at least one buffer is full */
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (readable_data(ssock)) {
toret = 1;
}
if (!(*gottime)) {
gettimeofday(tv, NULL);
*gottime = 1;
}
if (ssock->startidle == 0) {
ssock->startidle = tv->tv_sec;
} else if (tv->tv_sec - ssock->startidle > NDAG_IDLE_TIMEOUT) {
fprintf(stderr,
"Closing channel %s:%u due to inactivity.\n",
ssock->groupaddr,
ssock->port);
close(ssock->sock);
ssock->sock = -1;
}
} else {
fprintf(stderr,
"Error receiving encapsulated records from %s:%u -- %s \n",
ssock->groupaddr, ssock->port,
strerror(errno));
close(ssock->sock);
ssock->sock = -1;
}
return toret;
}
ssock->startidle = 0;
#if HAVE_DECL_RECVMMSG
for (i = 0; i < ret; i++) {
ndagstat = check_ndag_received(ssock, ssock->nextwriteind,
ssock->mmsgbufs[i].msg_len, rt);
if (ndagstat == -1) {
break;
}
if (ndagstat == 1) {
toret = 1;
}
}
#else
ndagstat = check_ndag_received(ssock, ssock->nextwriteind, ret, rt);
if (ndagstat <= 0) {
toret = 0;
} else {
toret = 1;
}
#endif
return toret;
}
static int receive_from_sockets(recvstream_t *rt) {
int i, readybufs, gottime;
struct timeval tv;
fd_set fds;
int maxfd = 0;
struct timeval zerotv;
readybufs = 0;
gottime = 0;
if (rt->maxfd == -1) {
return 0;
}
for (i = 0; i < rt->sourcecount; i++) {
if (rt->sources[i].sock == -1) {
continue;
}
#if HAVE_DECL_RECVMMSG
/* Plenty of full buffers, just use the packets in those */
if (rt->sources[i].bufavail < RECV_BATCH_SIZE / 2) {
readybufs ++;
continue;
}
#else
if (rt->sources[i].bufavail == 0) {
readybufs ++;
continue;
}
#endif
if (maxfd == 0) {
FD_ZERO(&fds);
}
FD_SET(rt->sources[i].sock, &fds);
if (maxfd < rt->sources[i].sock) {
maxfd = rt->sources[i].sock;
}
}
if (maxfd <= 0) {
return readybufs;
}
zerotv.tv_sec = 0;
zerotv.tv_usec = 0;
if (select(maxfd + 1, &fds, NULL, NULL, &zerotv) == -1) {
/* log the error? XXX */
return -1;
}
for (i = 0; i < rt->sourcecount; i++) {
if (!FD_ISSET(rt->sources[i].sock, &fds)) {
if (rt->sources[i].bufavail < ENCAP_BUFFERS) {
readybufs ++;
}
continue;
}
readybufs += receive_from_single_socket(&(rt->sources[i]),
&tv, &gottime, rt);
}
return readybufs;
}
static int receive_encap_records_block(libtrace_t *libtrace, recvstream_t *rt,
libtrace_packet_t *packet) {
int iserr = 0;
if (packet->buf_control == TRACE_CTRL_PACKET) {
free(packet->buffer);
packet->buffer = NULL;
}
do {
/* Make sure we shouldn't be halting */
if ((iserr = is_halted(libtrace)) != -1) {
return iserr;
}
/* Check for any messages from the control thread */
iserr = receiver_read_messages(rt);
if (iserr <= 0) {
return iserr;
}
/* If blocking and no sources, sleep for a bit and then try
* checking for messages again.
*/
if (rt->sourcecount == 0) {
usleep(10000);
continue;
}
if ((iserr = receive_from_sockets(rt)) < 0) {
return iserr;
} else if (iserr > 0) {
/* At least one of our input sockets has available
* data, let's go ahead and use what we have. */
break;
}
/* None of our sources have anything available, we can take
* a short break rather than immediately trying again.
*/
if (iserr == 0) {
usleep(100);
}
} while (1);
return iserr;
}
static int receive_encap_records_nonblock(libtrace_t *libtrace, recvstream_t *rt,
libtrace_packet_t *packet) {
int iserr = 0;
if (packet->buf_control == TRACE_CTRL_PACKET) {
free(packet->buffer);
packet->buffer = NULL;
}
/* Make sure we shouldn't be halting */
if ((iserr = is_halted(libtrace)) != -1) {
return iserr;
}
/* If non-blocking and there are no sources, just break */
if (rt->sourcecount == 0) {
return 0;
}
return receive_from_sockets(rt);
}
static streamsock_t *select_next_packet(recvstream_t *rt) {
int i;
streamsock_t *ssock = NULL;
uint64_t earliest = 0;
uint64_t currentts = 0;
dag_record_t *daghdr;
/* If we only have one source, then no need to do any
* timestamp parsing or byteswapping.
*/
if (rt->sourcecount == 1) {
if (readable_data(&(rt->sources[0]))) {
return &(rt->sources[0]);
}
return NULL;
}
for (i = 0; i < rt->sourcecount; i ++) {
if (!readable_data(&(rt->sources[i]))) {
continue;
}
if (rt->sources[i].nextts == 0) {
daghdr = (dag_record_t *)(rt->sources[i].nextread);
currentts = bswap_le_to_host64(daghdr->ts);
rt->sources[i].nextts = currentts;
} else {
currentts = rt->sources[i].nextts;
}
if (earliest == 0 || earliest > currentts) {
earliest = currentts;
ssock = &(rt->sources[i]);
}
}
return ssock;
}
static int ndag_read_packet(libtrace_t *libtrace, libtrace_packet_t *packet) {
int rem, ret;
streamsock_t *nextavail = NULL;
rem = receive_encap_records_block(libtrace, &(FORMAT_DATA->receivers[0]),
packet);
if (rem <= 0) {
return rem;
}
nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
if (nextavail == NULL) {
return 0;
}
/* nextread should point at an ERF header, so prepare 'packet' to be
* a libtrace ERF packet. */
ret = ndag_prepare_packet_stream(libtrace,
&(FORMAT_DATA->receivers[0]), nextavail,
packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
nextavail->bufavail += nextavail->bufwaiting;
nextavail->bufwaiting = 0;
return ret;
}
static int ndag_pread_packets(libtrace_t *libtrace, libtrace_thread_t *t,
libtrace_packet_t **packets, size_t nb_packets) {
recvstream_t *rt;
int rem, i;
size_t read_packets = 0;
streamsock_t *nextavail = NULL;
rt = (recvstream_t *)t->format_data;
do {
/* Only check for messages once per batch */
if (read_packets == 0) {
rem = receive_encap_records_block(libtrace, rt,
packets[read_packets]);
if (rem < 0) {
return rem;
}
if (rem == 0) {
break;
}
}
nextavail = select_next_packet(rt);
if (nextavail == NULL) {
break;
}
ndag_prepare_packet_stream(libtrace, rt, nextavail,
packets[read_packets],
TRACE_PREP_DO_NOT_OWN_BUFFER);
read_packets ++;
if (read_packets >= nb_packets) {
break;
}
} while (1);
for (i = 0; i < rt->sourcecount; i++) {
streamsock_t *src = &(rt->sources[i]);
src->bufavail += src->bufwaiting;
src->bufwaiting = 0;
if (src->bufavail < 0 || src->bufavail > ENCAP_BUFFERS) {
trace_set_err(libtrace, TRACE_ERR_BAD_IO, "Not enough buffer space in ndag_pread_packets()");
return -1;
}
}
return read_packets;
}
static libtrace_eventobj_t trace_event_ndag(libtrace_t *libtrace,
libtrace_packet_t *packet) {
libtrace_eventobj_t event = {0,0,0.0,0};
int rem, i;
streamsock_t *nextavail = NULL;
/* Only check for messages once per call */
rem = receiver_read_messages(&(FORMAT_DATA->receivers[0]));
if (rem <= 0) {
event.type = TRACE_EVENT_TERMINATE;
return event;
}
do {
rem = receive_encap_records_nonblock(libtrace,
&(FORMAT_DATA->receivers[0]), packet);
if (rem < 0) {
trace_set_err(libtrace, TRACE_ERR_BAD_PACKET,
"Received invalid nDAG records.");
event.type = TRACE_EVENT_TERMINATE;
break;
}
if (rem == 0) {
/* Either we've been halted or we've got no packets
* right now. */
if (is_halted(libtrace) == 0) {
event.type = TRACE_EVENT_TERMINATE;
break;
}
event.type = TRACE_EVENT_SLEEP;
event.seconds = 0.0001;
break;
}
nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]));
if (nextavail == NULL) {
event.type = TRACE_EVENT_SLEEP;
event.seconds = 0.0001;
break;
}
event.type = TRACE_EVENT_PACKET;
ndag_prepare_packet_stream(libtrace,
&(FORMAT_DATA->receivers[0]), nextavail,
packet, TRACE_PREP_DO_NOT_OWN_BUFFER);
event.size = trace_get_capture_length(packet) +
trace_get_framing_length(packet);
if (libtrace->filter) {
int filtret = trace_apply_filter(libtrace->filter,
packet);
if (filtret == -1) {
trace_set_err(libtrace,
TRACE_ERR_BAD_FILTER,
"Bad BPF Filter");
event.type = TRACE_EVENT_TERMINATE;
break;
}
if (filtret == 0) {
/* Didn't match filter, try next one */
libtrace->filtered_packets ++;
trace_clear_cache(packet);
continue;
}
}
if (libtrace->snaplen > 0) {
trace_set_capture_length(packet, libtrace->snaplen);
}
libtrace->accepted_packets ++;
break;
} while (1);
for (i = 0; i < FORMAT_DATA->receivers[0].sourcecount; i++) {
streamsock_t *src = &(FORMAT_DATA->receivers[0].sources[i]);
src->bufavail += src->bufwaiting;
src->bufwaiting = 0;
if (src->bufavail < 0 || src->bufavail > ENCAP_BUFFERS) {
trace_set_err(libtrace, TRACE_ERR_BAD_IO, "Not enough buffer space in trace_event_ndag()");
break;
}
}
return event;
}
static void ndag_get_statistics(libtrace_t *libtrace, libtrace_stat_t *stat) {
int i;
stat->dropped_valid = 1;
stat->dropped = 0;
stat->received_valid = 1;
stat->received = 0;
stat->missing_valid = 1;
stat->missing = 0;
/* TODO Is this thread safe? */
for (i = 0; i < libtrace->perpkt_thread_count; i++) {
stat->dropped += FORMAT_DATA->receivers[i].dropped_upstream;
stat->received += FORMAT_DATA->receivers[i].received_packets;
stat->missing += FORMAT_DATA->receivers[i].missing_records;
}
}
static void ndag_get_thread_stats(libtrace_t *libtrace, libtrace_thread_t *t,
libtrace_stat_t *stat) {
recvstream_t *recvr = (recvstream_t *)t->format_data;
if (libtrace == NULL)
return;
/* TODO Is this thread safe */
stat->dropped_valid = 1;
stat->dropped = recvr->dropped_upstream;
stat->received_valid = 1;
stat->received = recvr->received_packets;
stat->missing_valid = 1;
stat->missing = recvr->missing_records;
}
static int ndag_pregister_thread(libtrace_t *libtrace, libtrace_thread_t *t,
bool reader) {
recvstream_t *recvr;
if (!reader || t->type != THREAD_PERPKT) {
return 0;
}
recvr = &(FORMAT_DATA->receivers[t->perpkt_num]);
t->format_data = recvr;
return 0;
}
static struct libtrace_format_t ndag = {
"ndag",
"",
TRACE_FORMAT_NDAG,
NULL, /* probe filename */
NULL, /* probe magic */
ndag_init_input, /* init_input */
ndag_config_input, /* config_input */
ndag_start_input, /* start_input */
ndag_pause_input, /* pause_input */
NULL, /* init_output */
NULL, /* config_output */
NULL, /* start_output */
ndag_fin_input, /* fin_input */
NULL, /* fin_output */
ndag_read_packet, /* read_packet */
ndag_prepare_packet, /* prepare_packet */
NULL, /* fin_packet */
NULL, /* write_packet */
NULL, /* flush_output */
erf_get_link_type, /* get_link_type */
erf_get_direction, /* get_direction */
erf_set_direction, /* set_direction */
erf_get_erf_timestamp, /* get_erf_timestamp */
NULL, /* get_timeval */
NULL, /* get_seconds */
NULL, /* get_timespec */
NULL, /* seek_erf */
NULL, /* seek_timeval */
NULL, /* seek_seconds */
erf_get_capture_length, /* get_capture_length */
erf_get_wire_length, /* get_wire_length */
ndag_get_framing_length, /* get_framing_length */
erf_set_capture_length, /* set_capture_length */
NULL, /* get_received_packets */
NULL, /* get_filtered_packets */
NULL, /* get_dropped_packets */
ndag_get_statistics, /* get_statistics */
NULL, /* get_fd */
trace_event_ndag, /* trace_event */
NULL, /* help */
NULL, /* next pointer */
{true, 0}, /* live packet capture */
ndag_pstart_input, /* parallel start */
ndag_pread_packets, /* parallel read */
ndag_pause_input, /* parallel pause */
NULL,
ndag_pregister_thread, /* register thread */
NULL,
ndag_get_thread_stats /* per-thread stats */
};
void ndag_constructor(void) {
register_format(&ndag);
}