source: lib/format_etsilive.c @ b663d33

cachetimestampsdevelopetsiliverc-4.0.4ringdecrementfixringperformance
Last change on this file since b663d33 was b663d33, checked in by Shane Alcock <salcock@…>, 3 years ago

etsilive format is now functional (for single-threaded only).

Packets are decoded using libwandder. The ETSI headers are treated
as a meta-data layer.

Libpacketdump support has also been added for all fields and
structures that libwandder understands.

  • Property mode set to 100644
File size: 20.9 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28
29#include "config.h"
30#include "common.h"
31#include "libtrace.h"
32#include "libtrace_int.h"
33#include "format_helper.h"
34#include "data-struct/simple_circular_buffer.h"
35
36#include <libwandder.h>
37#include <libwandder_etsili.h>
38
39#include <assert.h>
40#include <errno.h>
41#include <fcntl.h>
42#include <stdio.h>
43#include <string.h>
44#include <stdlib.h>
45#include <unistd.h>
46#include <sys/socket.h>
47#include <sys/types.h>
48#include <netdb.h>
49
50#define ETSI_RECVBUF_SIZE (64 * 1024 * 1024)
51
52#define FORMAT_DATA ((etsilive_format_data_t *)libtrace->format_data)
53
54typedef struct etsipktcache {
55
56        uint64_t timestamp;
57        uint16_t length;
58
59} etsi_packet_cache_t;
60
61typedef struct etsisocket {
62        int sock;
63        struct sockaddr *srcaddr;
64
65        libtrace_scb_t recvbuffer;
66        etsi_packet_cache_t cached;
67
68} etsisocket_t;
69
70typedef struct etsithread {
71        libtrace_message_queue_t mqueue;
72        etsisocket_t *sources;
73        uint16_t sourcecount;
74        int threadindex;
75} etsithread_t;
76
77typedef struct etsilive_format_data {
78        char *listenport;
79        char *listenaddr;
80
81        pthread_t listenthread;
82        etsithread_t *receivers;
83        int nextthreadid;
84} etsilive_format_data_t;
85
86typedef struct newsendermessage {
87        int recvsock;
88        struct sockaddr *recvaddr;
89} newsend_message_t;
90
91
92static void *etsi_listener(void *tdata) {
93        libtrace_t *libtrace = (libtrace_t *)tdata;
94        struct addrinfo hints, *listenai;
95        struct sockaddr_storage *connected;
96        socklen_t addrsize;
97        int sock, consock;
98        int reuse = 1;
99
100        hints.ai_family = PF_UNSPEC;
101        hints.ai_socktype = SOCK_STREAM;
102        hints.ai_flags = AI_PASSIVE;
103        hints.ai_protocol = 0;
104
105        sock = -1;
106        listenai = NULL;
107
108        if (getaddrinfo(FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
109                        &hints, &listenai) != 0) {
110                fprintf(stderr,
111                        "Call to getaddrinfo failed for %s:%s -- %s\n",
112                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
113                        strerror(errno));
114                goto listenerror;
115        }
116
117        sock = socket(listenai->ai_family, listenai->ai_socktype, 0);
118        if (sock < 0) {
119                fprintf(stderr, "Failed to create socket for %s:%s -- %s\n",
120                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
121                        strerror(errno));
122                goto listenerror;
123        }
124
125        if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))
126                        < 0) {
127
128                fprintf(stderr, "Failed to configure socket for %s:%s -- %s\n",
129                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
130                        strerror(errno));
131
132                goto listenerror;
133        }
134
135        if (bind(sock, (struct sockaddr *)listenai->ai_addr,
136                        listenai->ai_addrlen) < 0) {
137
138                fprintf(stderr, "Failed to bind socket for %s:%s -- %s\n",
139                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
140                        strerror(errno));
141                goto listenerror;
142        }
143
144        if (listen(sock, 10) < 0) {
145                fprintf(stderr, "Failed to listen on socket for %s:%s -- %s\n",
146                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
147                        strerror(errno));
148                goto listenerror;
149        }
150
151        freeaddrinfo(listenai);
152
153        /* TODO consider possibility of pausing and resuming? */
154        while ((is_halted(libtrace) == -1)) {
155                newsend_message_t msg;
156                etsithread_t *et;
157
158                /* accept */
159                connected = (struct sockaddr_storage *)malloc(sizeof(struct
160                                sockaddr_storage));
161                addrsize = sizeof(struct sockaddr_storage);
162                consock = accept(sock, (struct sockaddr *)connected,
163                                &addrsize);
164                if (consock < 0) {
165                        fprintf(stderr, "Failed to accept connection on socket for %s:%s -- %s\n",
166                                FORMAT_DATA->listenaddr,
167                                FORMAT_DATA->listenport,
168                                strerror(errno));
169                        free(connected);
170                        goto listenerror;
171                }
172
173                /* if successful, send consock to next available thread */
174                msg.recvsock = consock;
175                msg.recvaddr = (struct sockaddr *)connected;
176                et = &(FORMAT_DATA->receivers[FORMAT_DATA->nextthreadid]);
177                libtrace_message_queue_put(&(et->mqueue), (void *)&msg);
178
179                if (libtrace->perpkt_thread_count > 0) {
180                        FORMAT_DATA->nextthreadid =
181                                ((FORMAT_DATA->nextthreadid + 1) %
182                                libtrace->perpkt_thread_count);
183                }
184        }
185
186        goto listenshutdown;
187
188listenerror:
189        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
190                "Unable to create listening socket for etsilive");
191
192listenshutdown:
193        if (sock >= 0) {
194                close(sock);
195        }
196        if (listenai) {
197                freeaddrinfo(listenai);
198        }
199        if (!is_halted(libtrace)) {
200                trace_interrupt();
201        }
202        pthread_exit(NULL);
203}
204
205
206
207static int etsilive_init_input(libtrace_t *libtrace) {
208        char *scan = NULL;
209        libtrace->format_data = (etsilive_format_data_t *)malloc(
210                        sizeof(etsilive_format_data_t));
211
212        FORMAT_DATA->receivers = NULL;
213        FORMAT_DATA->nextthreadid = 0;
214        FORMAT_DATA->listenaddr = NULL;
215
216        /* TODO is there a sensible default port number? */
217        scan = strchr(libtrace->uridata, ':');
218        if (scan == NULL) {
219                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
220                        "Bad etsilive URI. Should be etsilive:<listenaddr>:<port number>");
221                return -1;
222        }
223        FORMAT_DATA->listenaddr = strndup(libtrace->uridata,
224                        (size_t)(scan - libtrace->uridata));
225        FORMAT_DATA->listenport = strdup(scan + 1);
226
227        return 0;
228}
229
230static int etsilive_fin_input(libtrace_t *libtrace) {
231        if (FORMAT_DATA->receivers) {
232                free(FORMAT_DATA->receivers);
233        }
234
235        if (FORMAT_DATA->listenaddr) {
236                free(FORMAT_DATA->listenaddr);
237        }
238        if (FORMAT_DATA->listenport) {
239                free(FORMAT_DATA->listenport);
240        }
241        free(libtrace->format_data);
242        return 0;
243}
244
245static int etsilive_start_threads(libtrace_t *libtrace, uint32_t maxthreads) {
246        int ret;
247        uint32_t i;
248        /* Configure the set of receiver threads */
249
250        if (FORMAT_DATA->receivers == NULL) {
251                /* What if the number of threads changes between a pause and
252                 * a restart? Can this happen? */
253                FORMAT_DATA->receivers = (etsithread_t *)
254                                malloc(sizeof(etsithread_t) * maxthreads);
255        }
256
257        for (i = 0; i < maxthreads; i++) {
258
259                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
260                                sizeof(newsend_message_t));
261
262                FORMAT_DATA->receivers[i].sources = NULL;
263                FORMAT_DATA->receivers[i].sourcecount = 0;
264                FORMAT_DATA->receivers[i].threadindex = i;
265
266        }
267
268        /* Start the listening thread */
269        /* TODO consider affinity of this thread? */
270
271        ret = pthread_create(&(FORMAT_DATA->listenthread), NULL,
272                        etsi_listener, libtrace);
273        if (ret != 0) {
274                return -1;
275        }
276        return maxthreads;
277}
278
279static int etsilive_start_input(libtrace_t *libtrace) {
280        return etsilive_start_threads(libtrace, 1);
281}
282
283static void halt_etsi_thread(etsithread_t *receiver) {
284        int i;
285        libtrace_message_queue_destroy(&(receiver->mqueue));
286        if (receiver->sources == NULL)
287                return;
288        for (i = 0; i < receiver->sourcecount; i++) {
289                etsisocket_t src = receiver->sources[i];
290                libtrace_scb_destroy(&(src.recvbuffer));
291                close(src.sock);
292        }
293        free(receiver->sources);
294}
295
296static int etsilive_pause_input(libtrace_t *libtrace) {
297
298        int i;
299        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
300                halt_etsi_thread(&(FORMAT_DATA->receivers[i]));
301        }
302        return 0;
303
304}
305
306static int receiver_read_message(etsithread_t *et) {
307        newsend_message_t msg;
308
309        while (libtrace_message_queue_try_get(&(et->mqueue), (void *)&msg)
310                        != LIBTRACE_MQ_FAILED) {
311                etsisocket_t *esock = NULL;
312
313                if (et->sourcecount == 0) {
314                        et->sources = (etsisocket_t *)malloc(
315                                        sizeof(etsisocket_t) * 10);
316                } else if ((et->sourcecount % 10) == 0) {
317                        et->sources = (etsisocket_t *)realloc(et->sources,
318                                sizeof(etsisocket_t) * (et->sourcecount + 10));
319                }
320
321                esock = &(et->sources[et->sourcecount]);
322                esock->sock = msg.recvsock;
323                esock->srcaddr = msg.recvaddr;
324                libtrace_scb_init(&(esock->recvbuffer), ETSI_RECVBUF_SIZE,
325                                et->threadindex);
326                esock->cached.timestamp = 0;
327                esock->cached.length = 0;
328
329                et->sourcecount += 1;
330
331                fprintf(stderr, "Thread %d is now handling %u sources.\n",
332                                et->threadindex, et->sourcecount);
333        }
334        return 1;
335}
336
337static void receive_from_single_socket(etsisocket_t *esock) {
338
339        int ret = 0;
340
341        if (esock->sock == -1) {
342                return;
343        }
344
345        ret = libtrace_scb_recv_sock(&(esock->recvbuffer), esock->sock,
346                        MSG_DONTWAIT);
347        if (ret == -1) {
348                if (errno == EAGAIN || errno == EWOULDBLOCK) {
349                        /* Would have blocked, nothing available */
350                        return;
351                }
352                fprintf(stderr, "Error receiving on socket %d: %s\n",
353                                esock->sock, strerror(errno));
354                close(esock->sock);
355                esock->sock = -1;
356        }
357
358        if (ret == 0) {
359                fprintf(stderr, "Socket %d has disconnected\n", esock->sock);
360                close(esock->sock);
361                esock->sock = -1;
362        }
363
364}
365
366static int receive_etsi_sockets(libtrace_t *libtrace, etsithread_t *et) {
367
368        int iserr = 0;
369        int i;
370
371        if ((iserr = is_halted(libtrace)) != -1) {
372                return iserr;
373        }
374
375        iserr = receiver_read_message(et);
376        if (iserr <= 0) {
377                return iserr;
378        }
379
380        if (et->sourcecount == 0) {
381                return 1;
382        }
383
384        for (i = 0; i < et->sourcecount; i++) {
385                receive_from_single_socket(&(et->sources[i]));
386        }
387        return 1;
388
389}
390
391static etsisocket_t *select_next_packet(etsithread_t *et, libtrace_t *libtrace) {
392
393        int i;
394        etsisocket_t *esock = NULL;
395        uint64_t earliest = 0;
396        uint64_t current;
397        struct timeval tv;
398        uint32_t available;
399        uint8_t *ptr = NULL;
400        wandder_decoder_t dec;
401        uint32_t reclen = 0;
402
403        for (i = 0; i < et->sourcecount; i++) {
404                if (et->sources[i].sock == -1) {
405                        continue;
406                }
407                /* Have we already successfully decoded this? Cool,
408                 * just use whatever we cached last time.
409                 */
410                if (et->sources[i].cached.timestamp != 0) {
411                        current = et->sources[i].cached.timestamp;
412
413                        if (earliest == 0 || earliest > current) {
414                                earliest = current;
415                                esock = &(et->sources[i]);
416                        }
417                        continue;
418                }
419
420                ptr = libtrace_scb_get_read(&(et->sources[i].recvbuffer),
421                                &available);
422
423                if (available == 0 || ptr == NULL) {
424                        continue;
425                }
426
427                init_wandder_decoder(&dec, ptr, available, false);
428                if (et->sources[i].cached.length != 0) {
429                        reclen = et->sources[i].cached.length;
430                } else {
431                        reclen = wandder_etsili_get_pdu_length(&dec);
432
433                        if (reclen == 0) {
434                                free_wandder_decoder(&dec);
435                                continue;
436                        }
437                }
438
439                if (available < reclen) {
440                        /* Don't have the whole PDU yet */
441                        free_wandder_decoder(&dec);
442                        continue;
443                }
444
445                /* Get the timestamp */
446
447                tv = wandder_etsili_get_header_timestamp(&dec);
448                if (tv.tv_sec == 0) {
449                        free_wandder_decoder(&dec);
450                        continue;
451                }
452                current = ((((uint64_t)tv.tv_sec) << 32) +
453                                (((uint64_t)tv.tv_usec << 32)/1000000)); 
454
455                /* Success, cache everything we used so we don't have to
456                 * decode this packet again.
457                 */
458                et->sources[i].cached.timestamp = current;
459                et->sources[i].cached.length = reclen;
460
461
462                /* Don't forget to update earliest and esock... */
463                if (current < earliest || earliest == 0) {
464                        esock = &(et->sources[i]);
465                        earliest = current;
466                }
467
468        }
469        return esock;
470}
471
472static int etsilive_prepare_received(libtrace_t *libtrace, etsithread_t *et,
473                etsisocket_t *esock, libtrace_packet_t *packet) {
474
475        uint32_t available = 0;
476
477        packet->trace = libtrace;
478        packet->buffer = libtrace_scb_get_read(&(esock->recvbuffer),
479                                        &available);
480        packet->buf_control = TRACE_CTRL_EXTERNAL;
481        packet->header = NULL;          // Check this is ok to do
482        packet->payload = packet->buffer;
483        packet->type = TRACE_RT_DATA_ETSILI;
484        packet->order = esock->cached.timestamp;
485        packet->error = esock->cached.length;
486
487        packet->wire_length = esock->cached.length;
488        packet->capture_length = esock->cached.length;
489
490        /* Advance the read pointer for this buffer
491         * TODO should really do this in fin_packet, but will need a ref
492         * to esock to do this properly */
493        libtrace_scb_advance_read(&(esock->recvbuffer), esock->cached.length);
494        esock->cached.length = 0;
495        esock->cached.timestamp = 0;
496
497
498        return 1;
499}
500
501
502static int etsilive_read_packet(libtrace_t *libtrace,
503                libtrace_packet_t *packet) {
504
505        etsisocket_t *nextavail = NULL;
506        int ret;
507
508        while (1) {
509                /* Read from sockets for any buffers that do not have
510                 * a complete packet */
511                ret = receive_etsi_sockets(libtrace,
512                                &(FORMAT_DATA->receivers[0]));
513                if (ret <= 0) {
514                        return ret;
515                }
516
517                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]),
518                                libtrace);
519                if (nextavail == NULL) {
520                        /* No complete packets available, take a short
521                         * break before trying again. */
522                        if (FORMAT_DATA->receivers[0].sourcecount == 0) {
523                                /* No sources yet, so we can wait a bit
524                                 * longer. */
525                                usleep(10000);
526                        } else {
527                                usleep(100);
528                        }
529                        continue;
530                }
531                break;
532        }
533
534        return etsilive_prepare_received(libtrace,
535                        &(FORMAT_DATA->receivers[0]), nextavail,
536                        packet);
537}
538
539static int etsilive_prepare_packet(libtrace_t *libtrace UNUSED,
540                libtrace_packet_t *packet UNUSED,
541                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
542                uint32_t flags UNUSED) {
543        return 0;
544}
545
546static int etsilive_get_pdu_length(const libtrace_packet_t *packet) {
547
548        /* Should never get here because cache is set when packet is read */
549        wandder_decoder_t dec;
550        size_t reclen;
551
552        /* 0 should be ok here for quickly evaluating the first length
553         * field... */
554        init_wandder_decoder(&dec, packet->buffer, 0, false);
555        reclen = (size_t)wandder_etsili_get_pdu_length(&dec);
556
557        free_wandder_decoder(&dec);
558
559        return reclen;
560}
561
562static int etsilive_get_framing_length(const libtrace_packet_t *packet) {
563
564        return 0;
565}
566
567static struct timeval etsilive_get_timeval(const libtrace_packet_t *packet) {
568        /* TODO add cached timestamps to libtrace so we don't have to look
569         * this up again. */
570        struct timeval tv;
571        wandder_decoder_t dec;
572
573        tv.tv_sec = 0;
574        tv.tv_usec = 0;
575
576        init_wandder_decoder(&dec, packet->buffer, 0, false);
577        tv = wandder_etsili_get_header_timestamp(&dec);
578        free_wandder_decoder(&dec);
579        return tv;
580}
581
582static libtrace_linktype_t etsilive_get_link_type(
583                const libtrace_packet_t *packet) {
584        return TRACE_TYPE_ETSILI;
585}
586
587static struct libtrace_format_t etsilive = {
588        "etsilive",
589        "$Id$",
590        TRACE_FORMAT_ETSILIVE,
591        NULL,                           /* probe filename */
592        NULL,                           /* probe magic */
593        etsilive_init_input,            /* init_input */
594        NULL,                           /* config_input */
595        etsilive_start_input,           /* staetsilive_input */
596        etsilive_pause_input,           /* pause */
597        NULL,                           /* init_output */
598        NULL,                           /* config_output */
599        NULL,                           /* staetsilive_output */
600        etsilive_fin_input,             /* fin_input */
601        NULL,                           /* fin_output */
602        etsilive_read_packet,           /* read_packet */
603        etsilive_prepare_packet,        /* prepare_packet */
604        NULL,                           /* fin_packet */
605        NULL,                           /* write_packet */
606        etsilive_get_link_type,         /* get_link_type */
607        NULL,                           /* get_direction */
608        NULL,                           /* set_direction */
609        NULL,                           /* get_erf_timestamp */
610        etsilive_get_timeval,           /* get_timeval */
611        NULL,                           /* get_timespec */
612        NULL,                           /* get_seconds */
613        NULL,                           /* seek_erf */
614        NULL,                           /* seek_timeval */
615        NULL,                           /* seek_seconds */
616        etsilive_get_pdu_length,       /* get_capture_length */
617        etsilive_get_pdu_length,       /* get_wire_length */
618        etsilive_get_framing_length,    /* get_framing_length */
619        NULL,                           /* set_capture_length */
620        NULL,                           /* get_received_packets */
621        NULL,                           /* get_filtered_packets */
622        NULL,                           /* get_dropped_packets */
623        NULL,                           /* get_statistics */
624        NULL,                           /* get_fd */
625        NULL, //trace_event_etsilive,           /* trace_event */
626        NULL,                           /* help */
627        NULL,                           /* next pointer */
628        NON_PARALLEL(true)              /* TODO this can be parallel */
629};
630
631
632void etsilive_constructor(void) {
633        register_format(&etsilive);
634}
Note: See TracBrowser for help on using the repository browser.