source: lib/format_etsilive.c @ 00365c6

cachetimestampsdevelopetsiliverc-4.0.4ringdecrementfixringperformance
Last change on this file since 00365c6 was 00365c6, checked in by Shane Alcock <salcock@…>, 3 years ago

Update to use new libwandder_etsili API

The new API should be more thread-safe than the original, at the
cost of being a bit more awkward to use.

  • Property mode set to 100644
File size: 21.3 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28
29#include "config.h"
30#include "common.h"
31#include "libtrace.h"
32#include "libtrace_int.h"
33#include "format_helper.h"
34#include "data-struct/simple_circular_buffer.h"
35
36#include <libwandder.h>
37#include <libwandder_etsili.h>
38
39#include <assert.h>
40#include <errno.h>
41#include <fcntl.h>
42#include <stdio.h>
43#include <string.h>
44#include <stdlib.h>
45#include <unistd.h>
46#include <sys/socket.h>
47#include <sys/types.h>
48#include <netdb.h>
49
50#define ETSI_RECVBUF_SIZE (64 * 1024 * 1024)
51
52#define FORMAT_DATA ((etsilive_format_data_t *)libtrace->format_data)
53
54typedef struct etsipktcache {
55
56        uint64_t timestamp;
57        uint16_t length;
58
59} etsi_packet_cache_t;
60
61typedef struct etsisocket {
62        int sock;
63        struct sockaddr *srcaddr;
64
65        libtrace_scb_t recvbuffer;
66        etsi_packet_cache_t cached;
67
68} etsisocket_t;
69
70typedef struct etsithread {
71        libtrace_message_queue_t mqueue;
72        etsisocket_t *sources;
73        uint16_t sourcecount;
74        int threadindex;
75        wandder_etsispec_t *etsidec;
76} etsithread_t;
77
78typedef struct etsilive_format_data {
79        char *listenport;
80        char *listenaddr;
81
82        pthread_t listenthread;
83        etsithread_t *receivers;
84        int nextthreadid;
85} etsilive_format_data_t;
86
87typedef struct newsendermessage {
88        int recvsock;
89        struct sockaddr *recvaddr;
90} newsend_message_t;
91
92
93static void *etsi_listener(void *tdata) {
94        libtrace_t *libtrace = (libtrace_t *)tdata;
95        struct addrinfo hints, *listenai;
96        struct sockaddr_storage *connected;
97        socklen_t addrsize;
98        int sock, consock;
99        int reuse = 1;
100
101        hints.ai_family = PF_UNSPEC;
102        hints.ai_socktype = SOCK_STREAM;
103        hints.ai_flags = AI_PASSIVE;
104        hints.ai_protocol = 0;
105
106        sock = -1;
107        listenai = NULL;
108
109        if (getaddrinfo(FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
110                        &hints, &listenai) != 0) {
111                fprintf(stderr,
112                        "Call to getaddrinfo failed for %s:%s -- %s\n",
113                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
114                        strerror(errno));
115                goto listenerror;
116        }
117
118        sock = socket(listenai->ai_family, listenai->ai_socktype, 0);
119        if (sock < 0) {
120                fprintf(stderr, "Failed to create socket for %s:%s -- %s\n",
121                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
122                        strerror(errno));
123                goto listenerror;
124        }
125
126        if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))
127                        < 0) {
128
129                fprintf(stderr, "Failed to configure socket for %s:%s -- %s\n",
130                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
131                        strerror(errno));
132
133                goto listenerror;
134        }
135
136        if (bind(sock, (struct sockaddr *)listenai->ai_addr,
137                        listenai->ai_addrlen) < 0) {
138
139                fprintf(stderr, "Failed to bind socket for %s:%s -- %s\n",
140                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
141                        strerror(errno));
142                goto listenerror;
143        }
144
145        if (listen(sock, 10) < 0) {
146                fprintf(stderr, "Failed to listen on socket for %s:%s -- %s\n",
147                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
148                        strerror(errno));
149                goto listenerror;
150        }
151
152        freeaddrinfo(listenai);
153
154        /* TODO consider possibility of pausing and resuming? */
155        while ((is_halted(libtrace) == -1)) {
156                newsend_message_t msg;
157                etsithread_t *et;
158
159                /* accept */
160                connected = (struct sockaddr_storage *)malloc(sizeof(struct
161                                sockaddr_storage));
162                addrsize = sizeof(struct sockaddr_storage);
163                consock = accept(sock, (struct sockaddr *)connected,
164                                &addrsize);
165                if (consock < 0) {
166                        fprintf(stderr, "Failed to accept connection on socket for %s:%s -- %s\n",
167                                FORMAT_DATA->listenaddr,
168                                FORMAT_DATA->listenport,
169                                strerror(errno));
170                        free(connected);
171                        goto listenerror;
172                }
173
174                /* if successful, send consock to next available thread */
175                msg.recvsock = consock;
176                msg.recvaddr = (struct sockaddr *)connected;
177                et = &(FORMAT_DATA->receivers[FORMAT_DATA->nextthreadid]);
178                libtrace_message_queue_put(&(et->mqueue), (void *)&msg);
179
180                if (libtrace->perpkt_thread_count > 0) {
181                        FORMAT_DATA->nextthreadid =
182                                ((FORMAT_DATA->nextthreadid + 1) %
183                                libtrace->perpkt_thread_count);
184                }
185        }
186
187        goto listenshutdown;
188
189listenerror:
190        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
191                "Unable to create listening socket for etsilive");
192
193listenshutdown:
194        if (sock >= 0) {
195                close(sock);
196        }
197        if (listenai) {
198                freeaddrinfo(listenai);
199        }
200        if (!is_halted(libtrace)) {
201                trace_interrupt();
202        }
203        pthread_exit(NULL);
204}
205
206
207
208static int etsilive_init_input(libtrace_t *libtrace) {
209        char *scan = NULL;
210        libtrace->format_data = (etsilive_format_data_t *)malloc(
211                        sizeof(etsilive_format_data_t));
212
213        FORMAT_DATA->receivers = NULL;
214        FORMAT_DATA->nextthreadid = 0;
215        FORMAT_DATA->listenaddr = NULL;
216
217        /* TODO is there a sensible default port number? */
218        scan = strchr(libtrace->uridata, ':');
219        if (scan == NULL) {
220                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
221                        "Bad etsilive URI. Should be etsilive:<listenaddr>:<port number>");
222                return -1;
223        }
224        FORMAT_DATA->listenaddr = strndup(libtrace->uridata,
225                        (size_t)(scan - libtrace->uridata));
226        FORMAT_DATA->listenport = strdup(scan + 1);
227
228        return 0;
229}
230
231static int etsilive_fin_input(libtrace_t *libtrace) {
232        if (FORMAT_DATA->receivers) {
233                free(FORMAT_DATA->receivers);
234        }
235
236        if (FORMAT_DATA->listenaddr) {
237                free(FORMAT_DATA->listenaddr);
238        }
239        if (FORMAT_DATA->listenport) {
240                free(FORMAT_DATA->listenport);
241        }
242        free(libtrace->format_data);
243        return 0;
244}
245
246static int etsilive_start_threads(libtrace_t *libtrace, uint32_t maxthreads) {
247        int ret;
248        uint32_t i;
249        /* Configure the set of receiver threads */
250
251        if (FORMAT_DATA->receivers == NULL) {
252                /* What if the number of threads changes between a pause and
253                 * a restart? Can this happen? */
254                FORMAT_DATA->receivers = (etsithread_t *)
255                                malloc(sizeof(etsithread_t) * maxthreads);
256        }
257
258        for (i = 0; i < maxthreads; i++) {
259
260                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
261                                sizeof(newsend_message_t));
262
263                FORMAT_DATA->receivers[i].sources = NULL;
264                FORMAT_DATA->receivers[i].sourcecount = 0;
265                FORMAT_DATA->receivers[i].threadindex = i;
266                FORMAT_DATA->receivers[i].etsidec =
267                                wandder_create_etsili_decoder();
268
269        }
270
271        /* Start the listening thread */
272        /* TODO consider affinity of this thread? */
273
274        ret = pthread_create(&(FORMAT_DATA->listenthread), NULL,
275                        etsi_listener, libtrace);
276        if (ret != 0) {
277                return -1;
278        }
279        return maxthreads;
280}
281
282static int etsilive_start_input(libtrace_t *libtrace) {
283        return etsilive_start_threads(libtrace, 1);
284}
285
286static void halt_etsi_thread(etsithread_t *receiver) {
287        int i;
288        libtrace_message_queue_destroy(&(receiver->mqueue));
289        if (receiver->sources == NULL)
290                return;
291        for (i = 0; i < receiver->sourcecount; i++) {
292                etsisocket_t src = receiver->sources[i];
293                libtrace_scb_destroy(&(src.recvbuffer));
294                close(src.sock);
295        }
296        wandder_free_etsili_decoder(receiver->etsidec);
297        free(receiver->sources);
298}
299
300static int etsilive_pause_input(libtrace_t *libtrace) {
301
302        int i;
303        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
304                halt_etsi_thread(&(FORMAT_DATA->receivers[i]));
305        }
306        return 0;
307
308}
309
310static int receiver_read_message(etsithread_t *et) {
311        newsend_message_t msg;
312
313        while (libtrace_message_queue_try_get(&(et->mqueue), (void *)&msg)
314                        != LIBTRACE_MQ_FAILED) {
315                etsisocket_t *esock = NULL;
316
317                if (et->sourcecount == 0) {
318                        et->sources = (etsisocket_t *)malloc(
319                                        sizeof(etsisocket_t) * 10);
320                } else if ((et->sourcecount % 10) == 0) {
321                        et->sources = (etsisocket_t *)realloc(et->sources,
322                                sizeof(etsisocket_t) * (et->sourcecount + 10));
323                }
324
325                esock = &(et->sources[et->sourcecount]);
326                esock->sock = msg.recvsock;
327                esock->srcaddr = msg.recvaddr;
328                libtrace_scb_init(&(esock->recvbuffer), ETSI_RECVBUF_SIZE,
329                                et->threadindex);
330                esock->cached.timestamp = 0;
331                esock->cached.length = 0;
332
333                et->sourcecount += 1;
334
335                fprintf(stderr, "Thread %d is now handling %u sources.\n",
336                                et->threadindex, et->sourcecount);
337        }
338        return 1;
339}
340
341static void receive_from_single_socket(etsisocket_t *esock) {
342
343        int ret = 0;
344
345        if (esock->sock == -1) {
346                return;
347        }
348
349        ret = libtrace_scb_recv_sock(&(esock->recvbuffer), esock->sock,
350                        MSG_DONTWAIT);
351        if (ret == -1) {
352                if (errno == EAGAIN || errno == EWOULDBLOCK) {
353                        /* Would have blocked, nothing available */
354                        return;
355                }
356                fprintf(stderr, "Error receiving on socket %d: %s\n",
357                                esock->sock, strerror(errno));
358                close(esock->sock);
359                esock->sock = -1;
360        }
361
362        if (ret == 0) {
363                fprintf(stderr, "Socket %d has disconnected\n", esock->sock);
364                close(esock->sock);
365                esock->sock = -1;
366        }
367
368}
369
370static int receive_etsi_sockets(libtrace_t *libtrace, etsithread_t *et) {
371
372        int iserr = 0;
373        int i;
374
375        if ((iserr = is_halted(libtrace)) != -1) {
376                return iserr;
377        }
378
379        iserr = receiver_read_message(et);
380        if (iserr <= 0) {
381                return iserr;
382        }
383
384        if (et->sourcecount == 0) {
385                return 1;
386        }
387
388        for (i = 0; i < et->sourcecount; i++) {
389                receive_from_single_socket(&(et->sources[i]));
390        }
391        return 1;
392
393}
394
395static etsisocket_t *select_next_packet(etsithread_t *et, libtrace_t *libtrace) {
396
397        int i;
398        etsisocket_t *esock = NULL;
399        uint64_t earliest = 0;
400        uint64_t current;
401        struct timeval tv;
402        uint32_t available;
403        uint8_t *ptr = NULL;
404        wandder_decoder_t dec;
405        uint32_t reclen = 0;
406
407        for (i = 0; i < et->sourcecount; i++) {
408                if (et->sources[i].sock == -1) {
409                        continue;
410                }
411                /* Have we already successfully decoded this? Cool,
412                 * just use whatever we cached last time.
413                 */
414                if (et->sources[i].cached.timestamp != 0) {
415                        current = et->sources[i].cached.timestamp;
416
417                        if (earliest == 0 || earliest > current) {
418                                earliest = current;
419                                esock = &(et->sources[i]);
420                        }
421                        continue;
422                }
423
424                ptr = libtrace_scb_get_read(&(et->sources[i].recvbuffer),
425                                &available);
426
427                if (available == 0 || ptr == NULL) {
428                        continue;
429                }
430
431                wandder_attach_etsili_buffer(et->etsidec, ptr, available, false);
432                if (et->sources[i].cached.length != 0) {
433                        reclen = et->sources[i].cached.length;
434                } else {
435                        reclen = wandder_etsili_get_pdu_length(et->etsidec);
436
437                        if (reclen == 0) {
438                                continue;
439                        }
440                }
441
442                if (available < reclen) {
443                        /* Don't have the whole PDU yet */
444                        continue;
445                }
446
447                /* Get the timestamp */
448
449                tv = wandder_etsili_get_header_timestamp(et->etsidec);
450                if (tv.tv_sec == 0) {
451                        continue;
452                }
453                current = ((((uint64_t)tv.tv_sec) << 32) +
454                                (((uint64_t)tv.tv_usec << 32)/1000000)); 
455
456                /* Success, cache everything we used so we don't have to
457                 * decode this packet again.
458                 */
459                et->sources[i].cached.timestamp = current;
460                et->sources[i].cached.length = reclen;
461
462
463                /* Don't forget to update earliest and esock... */
464                if (current < earliest || earliest == 0) {
465                        esock = &(et->sources[i]);
466                        earliest = current;
467                }
468
469        }
470        return esock;
471}
472
473static int etsilive_prepare_received(libtrace_t *libtrace, etsithread_t *et,
474                etsisocket_t *esock, libtrace_packet_t *packet) {
475
476        uint32_t available = 0;
477
478        packet->trace = libtrace;
479        packet->buffer = libtrace_scb_get_read(&(esock->recvbuffer),
480                                        &available);
481        packet->buf_control = TRACE_CTRL_EXTERNAL;
482        packet->header = NULL;          // Check this is ok to do
483        packet->payload = packet->buffer;
484        packet->type = TRACE_RT_DATA_ETSILI;
485        packet->order = esock->cached.timestamp;
486        packet->error = esock->cached.length;
487
488        packet->wire_length = esock->cached.length;
489        packet->capture_length = esock->cached.length;
490
491        /*
492        for (j = 0; j < packet->wire_length; j++) {
493                printf("%02x ", *(((uint8_t *)packet->buffer) + j));
494                if ((j % 16) == 15) printf("\n");
495        }
496        printf("\n");
497        */
498
499        /* Advance the read pointer for this buffer
500         * TODO should really do this in fin_packet, but will need a ref
501         * to esock to do this properly */
502        libtrace_scb_advance_read(&(esock->recvbuffer), esock->cached.length);
503        esock->cached.length = 0;
504        esock->cached.timestamp = 0;
505
506
507        return 1;
508}
509
510
511static int etsilive_read_packet(libtrace_t *libtrace,
512                libtrace_packet_t *packet) {
513
514        etsisocket_t *nextavail = NULL;
515        int ret;
516
517        while (1) {
518                /* Read from sockets for any buffers that do not have
519                 * a complete packet */
520                ret = receive_etsi_sockets(libtrace,
521                                &(FORMAT_DATA->receivers[0]));
522                if (ret <= 0) {
523                        return ret;
524                }
525
526                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]),
527                                libtrace);
528                if (nextavail == NULL) {
529                        /* No complete packets available, take a short
530                         * break before trying again. */
531                        if (FORMAT_DATA->receivers[0].sourcecount == 0) {
532                                /* No sources yet, so we can wait a bit
533                                 * longer. */
534                                usleep(10000);
535                        } else {
536                                usleep(100);
537                        }
538                        continue;
539                }
540                break;
541        }
542
543        return etsilive_prepare_received(libtrace,
544                        &(FORMAT_DATA->receivers[0]), nextavail,
545                        packet);
546}
547
548static int etsilive_prepare_packet(libtrace_t *libtrace UNUSED,
549                libtrace_packet_t *packet UNUSED,
550                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
551                uint32_t flags UNUSED) {
552        return 0;
553}
554
555static int etsilive_get_pdu_length(const libtrace_packet_t *packet) {
556
557        /* Should never get here because cache is set when packet is read */
558        wandder_etsispec_t *dec;
559        size_t reclen;
560
561        /* 0 should be ok here for quickly evaluating the first length
562         * field... */
563        dec = wandder_create_etsili_decoder();
564        wandder_attach_etsili_buffer(dec, packet->buffer, 0, false);
565        reclen = (size_t)wandder_etsili_get_pdu_length(dec);
566        wandder_free_etsili_decoder(dec);
567
568        return reclen;
569}
570
571static int etsilive_get_framing_length(const libtrace_packet_t *packet) {
572
573        return 0;
574}
575
576static struct timeval etsilive_get_timeval(const libtrace_packet_t *packet) {
577        /* TODO add cached timestamps to libtrace so we don't have to look
578         * this up again. */
579        struct timeval tv;
580        wandder_etsispec_t *dec;
581
582        tv.tv_sec = 0;
583        tv.tv_usec = 0;
584
585
586        dec = wandder_create_etsili_decoder();
587        wandder_attach_etsili_buffer(dec, packet->buffer, 0, false);
588        tv = wandder_etsili_get_header_timestamp(dec);
589        wandder_free_etsili_decoder(dec);
590        return tv;
591}
592
593static libtrace_linktype_t etsilive_get_link_type(
594                const libtrace_packet_t *packet) {
595        return TRACE_TYPE_ETSILI;
596}
597
598static struct libtrace_format_t etsilive = {
599        "etsilive",
600        "$Id$",
601        TRACE_FORMAT_ETSILIVE,
602        NULL,                           /* probe filename */
603        NULL,                           /* probe magic */
604        etsilive_init_input,            /* init_input */
605        NULL,                           /* config_input */
606        etsilive_start_input,           /* staetsilive_input */
607        etsilive_pause_input,           /* pause */
608        NULL,                           /* init_output */
609        NULL,                           /* config_output */
610        NULL,                           /* staetsilive_output */
611        etsilive_fin_input,             /* fin_input */
612        NULL,                           /* fin_output */
613        etsilive_read_packet,           /* read_packet */
614        etsilive_prepare_packet,        /* prepare_packet */
615        NULL,                           /* fin_packet */
616        NULL,                           /* write_packet */
617        etsilive_get_link_type,         /* get_link_type */
618        NULL,                           /* get_direction */
619        NULL,                           /* set_direction */
620        NULL,                           /* get_erf_timestamp */
621        etsilive_get_timeval,           /* get_timeval */
622        NULL,                           /* get_timespec */
623        NULL,                           /* get_seconds */
624        NULL,                           /* seek_erf */
625        NULL,                           /* seek_timeval */
626        NULL,                           /* seek_seconds */
627        etsilive_get_pdu_length,       /* get_capture_length */
628        etsilive_get_pdu_length,       /* get_wire_length */
629        etsilive_get_framing_length,    /* get_framing_length */
630        NULL,                           /* set_capture_length */
631        NULL,                           /* get_received_packets */
632        NULL,                           /* get_filtered_packets */
633        NULL,                           /* get_dropped_packets */
634        NULL,                           /* get_statistics */
635        NULL,                           /* get_fd */
636        NULL, //trace_event_etsilive,           /* trace_event */
637        NULL,                           /* help */
638        NULL,                           /* next pointer */
639        NON_PARALLEL(true)              /* TODO this can be parallel */
640};
641
642
643void etsilive_constructor(void) {
644        register_format(&etsilive);
645}
Note: See TracBrowser for help on using the repository browser.