source: lib/data-struct/message_queue.c @ fac8c46

4.0.1-hotfixescachetimestampsdevelopdpdk-ndagetsilivelibtrace4ndag_formatpfringrc-4.0.1rc-4.0.2rc-4.0.3rc-4.0.4ringdecrementfixringperformanceringtimestampfixes
Last change on this file since fac8c46 was fac8c46, checked in by Richard Sanger <rsangerarj@…>, 8 years ago

Tidies up the pausing so that it now works as expected and a trace can easily be paused and restarted.
Ensures that packets will not be lost if pause is called on a file, any queued packets will be read (a message is sent allowing the user to drop these packets if they are unwanted).
Differentiates packets from other results in the queues to the reducer/reporter and makes a copy of the packets in result queues when pausing

  • this is needed to ensure that bad memory isn't referenced if a zero-copy trace is paused by closing sockets/associated data like in the case of ring:.

Fixed up the re-starting of traces which hadn't been finished to account for different configurations.
Adds a 'state' to libtrace to handle the state of parallel traces, rather than hacking around the existing 'started' boolean. Also provides two levels of checks for consistency if the trace is using existing that are checking started.

Various other bug fixes and tidy ups.

  • Property mode set to 100644
File size: 4.6 KB
Line 
1#include "message_queue.h"
2
3#include <unistd.h>
4#include <stdio.h>
5#include <limits.h>
6#include <assert.h>
7
8/**
9 * TODO look into using eventfd instead of a pipe if we have it available XXX
10 */
11
12/**
13 * @param mq A pointer to allocated space for a libtrace message queue
14 * @param message_len The size in bytes of the message item, to ensure thread safety this
15 *                should be less than PIPE_BUF (normally at least 512bytes)
16 *                see: man 7 pipe notes on atomic operations
17 */
18inline void libtrace_message_queue_init(libtrace_message_queue_t *mq, size_t message_len)
19{
20        assert(message_len);
21        assert(pipe(mq->pipefd) != -1);
22        mq->message_count = 0;
23        if (message_len > PIPE_BUF)
24                fprintf(stderr, "Warning message queue wont be atomic (thread safe) message_len(%zu) > PIPE_BUF(%d)\n",
25                                        message_len, PIPE_BUF);
26        mq->message_len = message_len;
27        pthread_spin_init(&mq->spin, 0);
28}
29
30/**
31 * Posts a message to the given message queue.
32 *
33 * This will block if a reader is not keeping up and the underlying pipe
34 * fills up.
35 *
36 * @param mq A pointer to a initilised libtrace message queue structure (NOT NULL)
37 * @param message A pointer to the message data you wish to send
38 * @return A number representing the number of messages already in the queue,
39 *         0 implies a thread was waiting and will read your message, negative
40 *         numbers implies threads are still waiting. Positive implies a backlog
41 *         of messages.
42 */
43inline int libtrace_message_queue_put(libtrace_message_queue_t *mq, const void *message)
44{
45        int ret;
46        assert(mq->message_len);
47        assert(write(mq->pipefd[1], message, mq->message_len) == (int) mq->message_len);
48        // Update after we've written
49        pthread_spin_lock(&mq->spin);
50        ret = ++mq->message_count; // Should be CAS!
51        pthread_spin_unlock(&mq->spin);
52        return ret;
53}
54
55/**
56 * Retrieves a message from the given message queue.
57 *
58 * This will block if a reader is not keeping up and the underlying pipe
59 * fills up.
60 *
61 * @param mq A pointer to a initilised libtrace message queue structure (NOT NULL)
62 * @param message A pointer to the message data you wish to send
63 * @return The number of messages remaining in the queue less any threads waiting,
64 *         0 implies a thread was waiting and will read your message, negative
65 *         numbers implies threads are still waiting. Positive implies a backlog
66 *         of messages.
67 */
68inline int libtrace_message_queue_get(libtrace_message_queue_t *mq, void *message)
69{
70        int ret;
71        // Safely decrease count first - Yes this might make us negative, however thats ok once a write comes in everything will be fine
72        pthread_spin_lock(&mq->spin);
73        ret = mq->message_count--;
74        pthread_spin_unlock(&mq->spin);
75        assert(read(mq->pipefd[0], message, mq->message_len) == (int) mq->message_len);
76        return ret;
77}
78
79/**
80 * Trys to retrieve a message from the given message queue.
81 *
82 * This will not block and instead returns LIBTRACE_MQ_FAILED if
83 * no message is available.
84 *
85 * @param mq A pointer to a initilised libtrace message queue structure (NOT NULL)
86 * @param message A pointer to the message data you wish to send
87 * @return The number of messages remaining in the queue less any threads waiting,
88 *         0 implies a thread was waiting and will read your message, negative
89 *         numbers implies threads are still waiting. Positive implies a backlog
90 *         of messages.
91 */
92inline int libtrace_message_queue_try_get(libtrace_message_queue_t *mq, void *message)
93{
94        int ret;
95        // Safely decrease count first - Yes this might make us negative, however thats ok once a write comes in everything will be fine
96        // ->Fast path avoid the lock
97        if (mq->message_count <= 0)
98                return LIBTRACE_MQ_FAILED;
99        // Else grab lock and confirm this is so
100        pthread_spin_lock(&mq->spin);
101        if (mq->message_count > 0) {
102                ret = --mq->message_count;
103                // :( read(...) needs to be done within the *spin* lock otherwise blocking might steal our read
104                assert(read(mq->pipefd[0], message, mq->message_len) == (int) mq->message_len);
105        } else {
106                ret = LIBTRACE_MQ_FAILED;
107        }
108        pthread_spin_unlock(&mq->spin);
109        return ret;
110}
111
112/**
113 * May be negative if threads blocking and waiting for a message.
114 */
115inline int libtrace_message_queue_count(const libtrace_message_queue_t *mq)
116{
117        // This is only ok because we know int is atomic
118        return mq->message_count;
119}
120
121inline void libtrace_message_queue_destroy(libtrace_message_queue_t *mq)
122{
123        mq->message_count = 0;
124        mq->message_len = 0;
125        close(mq->pipefd[0]);
126        close(mq->pipefd[1]);
127        pthread_spin_destroy(&mq->spin);
128}
129
130/**
131 * @return a file descriptor for the queue, can be used with select() poll() etc.
132 */
133inline int libtrace_message_queue_get_fd(libtrace_message_queue_t *mq)
134{
135        return mq->pipefd[0];
136}
Note: See TracBrowser for help on using the repository browser.