source: lib/format_etsilive.c @ df87f00

cachetimestampsdevelopetsiliverc-4.0.4ringdecrementfixringperformance
Last change on this file since df87f00 was df87f00, checked in by Shane Alcock <salcock@…>, 3 years ago

First bits of code for an ETSI live format

Will probably remove the dependencies on the ASN.1 stuff
soon, but chucking it all in for now just so everything will
compile nicely.

  • Property mode set to 100644
File size: 20.9 KB
Line 
1/*
2 *
3 * Copyright (c) 2007-2017 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libtrace.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libtrace is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libtrace is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27#define _GNU_SOURCE
28
29#include "config.h"
30#include "common.h"
31#include "libtrace.h"
32#include "libtrace_int.h"
33#include "format_helper.h"
34#include "data-struct/simple_circular_buffer.h"
35
36#include "etsiasn1tab.h"
37
38#include <assert.h>
39#include <errno.h>
40#include <fcntl.h>
41#include <stdio.h>
42#include <string.h>
43#include <stdlib.h>
44#include <unistd.h>
45#include <libtasn1.h>
46#include <sys/socket.h>
47#include <sys/types.h>
48#include <netdb.h>
49
50#define ETSI_RECVBUF_SIZE (64 * 1024 * 1024)
51
52#define FORMAT_DATA ((etsilive_format_data_t *)libtrace->format_data)
53
54typedef struct etsipktcache {
55
56        uint64_t timestamp;
57        uint8_t *ccptr;
58        uint8_t *iriptr;
59        uint16_t length;
60
61} etsi_packet_cache_t;
62
63typedef struct etsisocket {
64        int sock;
65        struct sockaddr *srcaddr;
66
67        libtrace_scb_t recvbuffer;
68        etsi_packet_cache_t cached;
69
70} etsisocket_t;
71
72typedef struct etsithread {
73        libtrace_message_queue_t mqueue;
74        etsisocket_t *sources;
75        uint16_t sourcecount;
76        int threadindex;
77} etsithread_t;
78
79typedef struct etsilive_format_data {
80        char *listenport;
81        char *listenaddr;
82
83        pthread_t listenthread;
84        etsithread_t *receivers;
85        int nextthreadid;
86        ASN1_TYPE etsidef;
87} etsilive_format_data_t;
88
89typedef struct newsendermessage {
90        int recvsock;
91        struct sockaddr *recvaddr;
92} newsend_message_t;
93
94
95static void *etsi_listener(void *tdata) {
96        libtrace_t *libtrace = (libtrace_t *)tdata;
97        struct addrinfo hints, *listenai;
98        struct sockaddr_storage *connected;
99        socklen_t addrsize;
100        int sock, consock;
101        int reuse = 1;
102
103        hints.ai_family = PF_UNSPEC;
104        hints.ai_socktype = SOCK_STREAM;
105        hints.ai_flags = AI_PASSIVE;
106        hints.ai_protocol = 0;
107
108        sock = -1;
109        listenai = NULL;
110
111        if (getaddrinfo(FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
112                        &hints, &listenai) != 0) {
113                fprintf(stderr,
114                        "Call to getaddrinfo failed for %s:%s -- %s\n",
115                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
116                        strerror(errno));
117                goto listenerror;
118        }
119
120        sock = socket(listenai->ai_family, listenai->ai_socktype, 0);
121        if (sock < 0) {
122                fprintf(stderr, "Failed to create socket for %s:%s -- %s\n",
123                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
124                        strerror(errno));
125                goto listenerror;
126        }
127
128        if (setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse))
129                        < 0) {
130
131                fprintf(stderr, "Failed to configure socket for %s:%s -- %s\n",
132                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
133                        strerror(errno));
134
135                goto listenerror;
136        }
137
138        if (bind(sock, (struct sockaddr *)listenai->ai_addr,
139                        listenai->ai_addrlen) < 0) {
140
141                fprintf(stderr, "Failed to bind socket for %s:%s -- %s\n",
142                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
143                        strerror(errno));
144                goto listenerror;
145        }
146
147        if (listen(sock, 10) < 0) {
148                fprintf(stderr, "Failed to listen on socket for %s:%s -- %s\n",
149                        FORMAT_DATA->listenaddr, FORMAT_DATA->listenport,
150                        strerror(errno));
151                goto listenerror;
152        }
153
154        freeaddrinfo(listenai);
155
156        /* TODO consider possibility of pausing and resuming? */
157        while ((is_halted(libtrace) == -1)) {
158                newsend_message_t msg;
159                etsithread_t *et;
160
161                /* accept */
162                connected = (struct sockaddr_storage *)malloc(sizeof(struct
163                                sockaddr_storage));
164                addrsize = sizeof(struct sockaddr_storage);
165                consock = accept(sock, (struct sockaddr *)connected,
166                                &addrsize);
167                if (consock < 0) {
168                        fprintf(stderr, "Failed to accept connection on socket for %s:%s -- %s\n",
169                                FORMAT_DATA->listenaddr,
170                                FORMAT_DATA->listenport,
171                                strerror(errno));
172                        free(connected);
173                        goto listenerror;
174                }
175
176                /* if successful, send consock to next available thread */
177                msg.recvsock = consock;
178                msg.recvaddr = (struct sockaddr *)connected;
179                et = &(FORMAT_DATA->receivers[FORMAT_DATA->nextthreadid]);
180                libtrace_message_queue_put(&(et->mqueue), (void *)&msg);
181
182                if (libtrace->perpkt_thread_count > 0) {
183                        FORMAT_DATA->nextthreadid =
184                                ((FORMAT_DATA->nextthreadid + 1) %
185                                libtrace->perpkt_thread_count);
186                }
187        }
188
189        goto listenshutdown;
190
191listenerror:
192        trace_set_err(libtrace, TRACE_ERR_INIT_FAILED,
193                "Unable to create listening socket for etsilive");
194
195listenshutdown:
196        if (sock >= 0) {
197                close(sock);
198        }
199        if (listenai) {
200                freeaddrinfo(listenai);
201        }
202        if (!is_halted(libtrace)) {
203                trace_interrupt();
204        }
205        pthread_exit(NULL);
206}
207
208
209
210static int etsilive_init_input(libtrace_t *libtrace) {
211        char *scan = NULL;
212        char errordesc[ASN1_MAX_ERROR_DESCRIPTION_SIZE];
213
214        libtrace->format_data = (etsilive_format_data_t *)malloc(
215                        sizeof(etsilive_format_data_t));
216
217        FORMAT_DATA->receivers = NULL;
218        FORMAT_DATA->nextthreadid = 0;
219        FORMAT_DATA->listenaddr = NULL;
220
221        /* TODO is there a sensible default port number? */
222        scan = strchr(libtrace->uridata, ':');
223        if (scan == NULL) {
224                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
225                        "Bad etsilive URI. Should be etsilive:<listenaddr>:<port number>");
226                return -1;
227        }
228        FORMAT_DATA->listenaddr = strndup(libtrace->uridata,
229                        (size_t)(scan - libtrace->uridata));
230        FORMAT_DATA->listenport = strdup(scan + 1);
231
232        if (asn1_array2tree(etsili_asn1tab, &(FORMAT_DATA->etsidef),
233                                errordesc) != ASN1_SUCCESS) {
234                trace_set_err(libtrace, TRACE_ERR_BAD_FORMAT,
235                        "Failed to parse ETSI ASN.1 array: %s", errordesc);
236                return -1;
237        }
238
239        return 0;
240}
241
242static int etsilive_fin_input(libtrace_t *libtrace) {
243        if (FORMAT_DATA->receivers) {
244                free(FORMAT_DATA->receivers);
245        }
246
247        if (FORMAT_DATA->listenaddr) {
248                free(FORMAT_DATA->listenaddr);
249        }
250        if (FORMAT_DATA->listenport) {
251                free(FORMAT_DATA->listenport);
252        }
253        asn1_delete_structure(&(FORMAT_DATA->etsidef));
254        free(libtrace->format_data);
255        return 0;
256}
257
258static int etsilive_start_threads(libtrace_t *libtrace, uint32_t maxthreads) {
259        int ret;
260        uint32_t i;
261        /* Configure the set of receiver threads */
262
263        if (FORMAT_DATA->receivers == NULL) {
264                /* What if the number of threads changes between a pause and
265                 * a restart? Can this happen? */
266                FORMAT_DATA->receivers = (etsithread_t *)
267                                malloc(sizeof(etsithread_t) * maxthreads);
268        }
269
270        for (i = 0; i < maxthreads; i++) {
271
272                libtrace_message_queue_init(&(FORMAT_DATA->receivers[i].mqueue),
273                                sizeof(newsend_message_t));
274
275                FORMAT_DATA->receivers[i].sources = NULL;
276                FORMAT_DATA->receivers[i].sourcecount = 0;
277                FORMAT_DATA->receivers[i].threadindex = i;
278
279        }
280
281        /* Start the listening thread */
282        /* TODO consider affinity of this thread? */
283
284        ret = pthread_create(&(FORMAT_DATA->listenthread), NULL,
285                        etsi_listener, libtrace);
286        if (ret != 0) {
287                return -1;
288        }
289        return maxthreads;
290}
291
292static int etsilive_start_input(libtrace_t *libtrace) {
293        return etsilive_start_threads(libtrace, 1);
294}
295
296static void halt_etsi_thread(etsithread_t *receiver) {
297        int i;
298        libtrace_message_queue_destroy(&(receiver->mqueue));
299        if (receiver->sources == NULL)
300                return;
301        for (i = 0; i < receiver->sourcecount; i++) {
302                etsisocket_t src = receiver->sources[i];
303                libtrace_scb_destroy(&(src.recvbuffer));
304                close(src.sock);
305        }
306        free(receiver->sources);
307}
308
309static int etsilive_pause_input(libtrace_t *libtrace) {
310
311        int i;
312        for (i = 0; i < libtrace->perpkt_thread_count; i++) {
313                halt_etsi_thread(&(FORMAT_DATA->receivers[i]));
314        }
315        return 0;
316
317}
318
319static int receiver_read_message(etsithread_t *et) {
320        newsend_message_t msg;
321
322        while (libtrace_message_queue_try_get(&(et->mqueue), (void *)&msg)
323                        != LIBTRACE_MQ_FAILED) {
324                etsisocket_t *esock = NULL;
325
326                if (et->sourcecount == 0) {
327                        et->sources = (etsisocket_t *)malloc(
328                                        sizeof(etsisocket_t) * 10);
329                } else if ((et->sourcecount % 10) == 0) {
330                        et->sources = (etsisocket_t *)realloc(et->sources,
331                                sizeof(etsisocket_t) * (et->sourcecount + 10));
332                }
333
334                esock = &(et->sources[et->sourcecount]);
335                esock->sock = msg.recvsock;
336                esock->srcaddr = msg.recvaddr;
337                libtrace_scb_init(&(esock->recvbuffer), ETSI_RECVBUF_SIZE,
338                                et->threadindex);
339                esock->cached.timestamp = 0;
340                esock->cached.length = 0;
341                esock->cached.iriptr = NULL;
342                esock->cached.ccptr = NULL;
343
344                et->sourcecount += 1;
345
346                fprintf(stderr, "Thread %d is now handling %u sources.\n",
347                                et->threadindex, et->sourcecount);
348        }
349        return 1;
350}
351
352static void receive_from_single_socket(etsisocket_t *esock) {
353
354        int ret = 0;
355
356        if (esock->sock == -1) {
357                return;
358        }
359
360        ret = libtrace_scb_recv_sock(&(esock->recvbuffer), esock->sock,
361                        MSG_DONTWAIT);
362        if (ret == -1) {
363                if (errno == EAGAIN || errno == EWOULDBLOCK) {
364                        /* Would have blocked, nothing available */
365                        return;
366                }
367                fprintf(stderr, "Error receiving on socket %d: %s\n",
368                                esock->sock, strerror(errno));
369                close(esock->sock);
370                esock->sock = -1;
371        }
372
373        if (ret == 0) {
374                fprintf(stderr, "Socket %d has disconnected\n", esock->sock);
375                close(esock->sock);
376                esock->sock = -1;
377        }
378
379}
380
381static int receive_etsi_sockets(libtrace_t *libtrace, etsithread_t *et) {
382
383        int iserr = 0;
384        int i;
385
386        if ((iserr = is_halted(libtrace)) != -1) {
387                return iserr;
388        }
389
390        iserr = receiver_read_message(et);
391        if (iserr <= 0) {
392                return iserr;
393        }
394
395        if (et->sourcecount == 0) {
396                return 1;
397        }
398
399        for (i = 0; i < et->sourcecount; i++) {
400                receive_from_single_socket(&(et->sources[i]));
401        }
402        return 1;
403
404}
405
406static etsisocket_t *select_next_packet(etsithread_t *et, libtrace_t *libtrace) {
407
408        int i, asnret;
409        etsisocket_t *esock = NULL;
410        char errordesc[ASN1_MAX_ERROR_DESCRIPTION_SIZE];
411        char readstring[100];
412        uint64_t earliest = 0;
413        uint64_t currentts = 0;
414        uint32_t available;
415        uint8_t *ptr = NULL;
416        ASN1_TYPE psheader;
417        int ider_len;
418        int readlen;
419
420        for (i = 0; i < et->sourcecount; i++) {
421                if (et->sources[i].sock == -1) {
422                        continue;
423                }
424                /* Have we already successfully decoded this? Cool,
425                 * just use whatever we cached last time.
426                 */
427                if (et->sources[i].cached.timestamp != 0) {
428                        currentts = et->sources[i].cached.timestamp;
429
430                        if (earliest == 0 || earliest > currentts) {
431                                earliest = currentts;
432                                esock = &(et->sources[i]);
433                        }
434                        continue;
435                }
436
437                ptr = libtrace_scb_get_read(&(et->sources[i].recvbuffer),
438                                &available);
439
440                if (available == 0) {
441                        continue;
442                }
443
444                /* Try to decode whatever is at the front of the buffer. */
445                asnret = asn1_create_element(FORMAT_DATA->etsidef,
446                                "LI-PS-PDU.PSHeader", &psheader);
447                if (asnret != ASN1_SUCCESS) {
448                        fprintf(stderr, "failed to create asn1 element\n");
449                        asn1_delete_structure(&psheader);
450                        continue;
451                }
452
453                ider_len = (int)available;
454                asnret = asn1_der_decoding2(&psheader, ptr, &ider_len, 0, errordesc);
455
456                /* Failed? Must not have the whole packet... */
457                if (asnret != ASN1_SUCCESS) {
458                        int j;
459                        for (j = 0; j < available; j++) {
460                                printf("%02x ", ptr[j]);
461                                if (j % 16 == 15 && j > 0) {
462                                        printf("\n");
463                                }
464                                if (j >= 16 * 16)
465                                        break;
466                        }
467                        fprintf(stderr, "%d failed to decode asn1 content: %s\n",
468                                        available, errordesc);
469                        assert(0);
470                        asn1_delete_structure(&psheader);
471                        continue;
472                }
473
474                readlen = sizeof(readstring);
475                asnret = asn1_read_value(psheader, "timeStamp", readstring, &readlen);
476
477                if (asnret == ASN1_SUCCESS) {
478                        fprintf(stderr, "timeStamp=%s\n", readstring);
479                        /* TODO turn to 64 bit timestamp */
480                } else {
481                        int msts_sec;
482                        int msts_ms;
483
484                        readlen = sizeof(int);
485                        asnret = asn1_read_value(psheader, "microSecondTimeStamp.seconds", &msts_sec, &readlen);
486
487                        if (asnret != ASN1_SUCCESS) {
488                                fprintf(stderr, "no microSecondTimeStamp.seconds\n");
489                                continue;
490                        }
491
492                        readlen = sizeof(int);
493                        asnret = asn1_read_value(psheader, "microSecondTimeStamp.microSeconds", &msts_ms, &readlen);
494
495                        if (asnret != ASN1_SUCCESS) {
496                                fprintf(stderr, "no microSecondTimeStamp.microSeconds?\n");
497                                continue;
498                        }
499                        fprintf(stderr, "microSecondTimeStamp=%d.%d\n", msts_sec, msts_ms);
500                }
501                assert(0);
502                /* Success, cache everything we used so we don't have to
503                 * decode this packet again.
504                 */
505
506                /* Advance the read pointer for this buffer */
507
508                /* Don't forget to update earliest and esock... */
509
510        }
511        return esock;
512}
513
514static int etsilive_prepare_received(libtrace_t *libtrace, etsithread_t *et,
515                etsisocket_t *esock, libtrace_packet_t *packet) {
516        return 1;
517}
518
519
520static int etsilive_read_packet(libtrace_t *libtrace,
521                libtrace_packet_t *packet) {
522
523        etsisocket_t *nextavail = NULL;
524        int ret;
525
526        while (1) {
527                /* Read from sockets for any buffers that do not have
528                 * a complete packet */
529                ret = receive_etsi_sockets(libtrace,
530                                &(FORMAT_DATA->receivers[0]));
531                if (ret <= 0) {
532                        return ret;
533                }
534
535                nextavail = select_next_packet(&(FORMAT_DATA->receivers[0]),
536                                libtrace);
537                if (nextavail == NULL) {
538                        /* No complete packets available, take a short
539                         * break before trying again. */
540                        if (FORMAT_DATA->receivers[0].sourcecount == 0) {
541                                /* No sources yet, so we can wait a bit
542                                 * longer. */
543                                usleep(10000);
544                        } else {
545                                usleep(100);
546                        }
547                        continue;
548                }
549                break;
550        }
551
552        return etsilive_prepare_received(libtrace,
553                        &(FORMAT_DATA->receivers[0]), nextavail,
554                        packet);
555}
556
557static int etsilive_prepare_packet(libtrace_t *libtrace UNUSED,
558                libtrace_packet_t *packet UNUSED,
559                void *buffer UNUSED, libtrace_rt_types_t rt_type UNUSED,
560                uint32_t flags UNUSED) {
561        return 0;
562}
563
564
565
566
567static struct libtrace_format_t etsilive = {
568        "etsilive",
569        "$Id$",
570        TRACE_FORMAT_ETSILIVE,
571        NULL,                           /* probe filename */
572        NULL,                           /* probe magic */
573        etsilive_init_input,            /* init_input */
574        NULL,                           /* config_input */
575        etsilive_start_input,           /* staetsilive_input */
576        etsilive_pause_input,           /* pause */
577        NULL,                           /* init_output */
578        NULL,                           /* config_output */
579        NULL,                           /* staetsilive_output */
580        etsilive_fin_input,             /* fin_input */
581        NULL,                           /* fin_output */
582        etsilive_read_packet,           /* read_packet */
583        etsilive_prepare_packet,        /* prepare_packet */
584        NULL,                           /* fin_packet */
585        NULL,                           /* write_packet */
586        NULL, //etsilive_get_link_type,         /* get_link_type */
587        NULL,                           /* get_direction */
588        NULL,                           /* set_direction */
589        NULL,                           /* get_erf_timestamp */
590        NULL,                           /* get_timeval */
591        NULL,                           /* get_timespec */
592        NULL,                           /* get_seconds */
593        NULL,                           /* seek_erf */
594        NULL,                           /* seek_timeval */
595        NULL,                           /* seek_seconds */
596        NULL, //etsilive_get_capture_length,    /* get_capture_length */
597        NULL, //etsilive_get_wire_length,       /* get_wire_length */
598        NULL, //etsilive_get_framing_length,    /* get_framing_length */
599        NULL,                           /* set_capture_length */
600        NULL,                           /* get_received_packets */
601        NULL,                           /* get_filtered_packets */
602        NULL,                           /* get_dropped_packets */
603        NULL,                           /* get_statistics */
604        NULL,                           /* get_fd */
605        NULL, //trace_event_etsilive,           /* trace_event */
606        NULL,                           /* help */
607        NULL,                           /* next pointer */
608        NON_PARALLEL(true)              /* TODO this can be parallel */
609};
610
611
612void etsilive_constructor(void) {
613        register_format(&etsilive);
614}
Note: See TracBrowser for help on using the repository browser.