source: lib/data-struct/deque.c @ fac8c46

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since fac8c46 was fac8c46, checked in by Richard Sanger <rsangerarj@…>, 8 years ago

Tidies up the pausing so that it now works as expected and a trace can easily be paused and restarted.
Ensures that packets will not be lost if pause is called on a file, any queued packets will be read (a message is sent allowing the user to drop these packets if they are unwanted).
Differentiates packets from other results in the queues to the reducer/reporter and makes a copy of the packets in result queues when pausing

  • this is needed to ensure that bad memory isn't referenced if a zero-copy trace is paused by closing sockets/associated data like in the case of ring:.

Fixed up the re-starting of traces which hadn't been finished to account for different configurations.
Adds a 'state' to libtrace to handle the state of parallel traces, rather than hacking around the existing 'started' boolean. Also provides two levels of checks for consistency if the trace is using existing that are checking started.

Various other bug fixes and tidy ups.

  • Property mode set to 100644
File size: 4.2 KB
Line 
1#include "deque.h"
2
3#include <assert.h>
4#include <stddef.h>
5#include <malloc.h>
6#include <string.h>
7
8
9/* Ensure we don't do any reads without locking even if we *know* that
10 * any write will be atomic */
11#ifndef RACE_SAFE
12#define RACE_SAFE 1
13#endif
14
15struct list_node {
16        list_node_t * next;
17        list_node_t * prev;
18        char data[]; // Our item goes here
19};
20
21DLLEXPORT void libtrace_deque_init(libtrace_queue_t * q, size_t element_size)
22{
23        q->head = NULL;
24        q->tail = NULL;
25        q->size = 0;
26        q->element_size = element_size;
27        assert(pthread_mutex_init(&q->lock, NULL) == 0);
28}
29
30DLLEXPORT void libtrace_deque_push_back(libtrace_queue_t *q, void *d)
31{
32        // Do as much work as possible outside the lock
33        list_node_t * new_node = (list_node_t *) malloc(sizeof(list_node_t) + q->element_size);
34        new_node->next = NULL;
35        // Fill it
36        memcpy(&new_node->data, d, q->element_size);
37        // Only ->prev is unknown at this stage to be completed in lock
38        assert(pthread_mutex_lock(&q->lock) == 0);
39        if (q->head == NULL) {
40                assert(q->tail == NULL && q->size == 0);
41                new_node->prev = NULL;
42                q->head = q->tail = new_node;
43        } else {
44                assert (q->tail != NULL);
45                q->tail->next = new_node;
46                new_node->prev = q->tail; // Done the double link
47                q->tail = new_node; // Relink tail
48        }
49        q->size++;
50        assert(pthread_mutex_unlock(&q->lock) == 0);
51}
52
53DLLEXPORT void libtrace_deque_push_front(libtrace_queue_t *q, void *d)
54{
55        // Do as much work as possible outside the lock
56        list_node_t * new_node = (list_node_t *) malloc(sizeof(list_node_t) + q->element_size);
57        new_node->prev = NULL;
58        // Fill it
59        memcpy(&new_node->data, d, q->element_size);
60        // Only ->next is unknown at this stage to be completed in lock
61        assert(pthread_mutex_lock(&q->lock) == 0);
62        if (q->head == NULL) {
63                assert(q->tail == NULL && q->size == 0);
64                new_node->next = NULL;
65                q->head = q->tail = new_node;
66        } else {
67                assert (q->head != NULL);
68                q->head->prev = new_node;
69                new_node->next = q->head; // Done the double link
70                q->head = new_node; // Relink head
71                // Void out the other things
72        }
73        q->size++;
74        assert(pthread_mutex_unlock(&q->lock) == 0);
75}
76
77DLLEXPORT int libtrace_deque_peek_front(libtrace_queue_t *q, void *d)
78{
79        int ret = 1;
80        assert(pthread_mutex_lock(&q->lock) == 0);
81        if (q->head == NULL)
82                ret = 0;
83        else
84                memcpy(d, &q->head->data, q->element_size);
85        assert(pthread_mutex_unlock(&q->lock) == 0);
86        return ret;
87}
88
89DLLEXPORT int libtrace_deque_peek_tail(libtrace_queue_t *q, void *d)
90{
91        int ret = 1;
92        assert(pthread_mutex_lock(&q->lock) == 0);
93        if (q->tail == NULL)
94                ret = 0;
95        else
96                memcpy(d, &q->tail->data, q->element_size);
97        assert(pthread_mutex_unlock(&q->lock) == 0);
98        return ret;
99}
100
101DLLEXPORT int libtrace_deque_pop_front(libtrace_queue_t *q, void *d)
102{
103        int ret = 0;
104        list_node_t * n = NULL;
105        assert(pthread_mutex_lock(&q->lock) == 0);
106        if (q->head != NULL) {
107                n = q->head;
108                ret = 1;
109                q->head = n->next;
110                if (q->head)
111                        q->head->prev = NULL;
112                q->size--;
113                if (q->size <= 1) // Either 1 or 0 items
114                        q->tail = q->head;
115        }
116        assert(pthread_mutex_unlock(&q->lock) == 0);
117        // Unlock once we've removed it :)
118        if (ret) {
119                memcpy(d, &n->data, q->element_size);
120                free(n);
121        }
122        return ret;
123}
124
125DLLEXPORT int libtrace_deque_pop_tail(libtrace_queue_t *q, void *d)
126{
127        int ret = 0;
128        list_node_t * n;
129        assert(pthread_mutex_lock(&q->lock) == 0);
130        if (q->tail != NULL) {
131                n = q->tail;
132                ret = 1;
133                q->tail = n->prev;
134                if (q->tail)
135                        q->tail->next = NULL;
136                q->size--;
137                if (q->size <= 1) // Either 1 or 0 items
138                        q->head = q->tail;
139        }
140        assert(pthread_mutex_unlock(&q->lock) == 0);
141        if (ret) {
142                memcpy(d, &n->data, q->element_size);
143                free(n);
144        }
145        return ret;
146}
147
148DLLEXPORT size_t libtrace_deque_get_size(libtrace_queue_t *q)
149{
150#if RACE_SAFE
151        size_t ret;
152        assert(pthread_mutex_lock(&q->lock) == 0);
153        ret = q->size;
154        assert(pthread_mutex_unlock(&q->lock) == 0);
155        return ret;
156#else
157        return q->size;
158#endif
159}
160
161DLLEXPORT void libtrace_zero_deque(libtrace_queue_t *q)
162{
163        q->head = q->tail = NULL;
164        q->size = q->element_size = 0;
165}
166
167DLLEXPORT void libtrace_deque_apply_function(libtrace_queue_t *q, deque_data_fn fn)
168{
169        list_node_t *n;
170        assert(q->element_size == sizeof(libtrace_result_t));
171        assert(pthread_mutex_lock(&q->lock) == 0);
172        n = q->head;
173        for (n = q->head; n != NULL; n = n->next) {
174                (*fn)(&n->data);
175        }
176        assert(pthread_mutex_unlock(&q->lock) == 0);
177}
Note: See TracBrowser for help on using the repository browser.