root / tests / test-thread-pool.c @ 17862378
History | View | Annotate | Download (5.6 kB)
1 |
#include <glib.h> |
---|---|
2 |
#include "qemu-common.h" |
3 |
#include "block/aio.h" |
4 |
#include "block/thread-pool.h" |
5 |
#include "block/block.h" |
6 |
|
7 |
static int active; |
8 |
|
9 |
typedef struct { |
10 |
BlockDriverAIOCB *aiocb; |
11 |
int n;
|
12 |
int ret;
|
13 |
} WorkerTestData; |
14 |
|
15 |
static int worker_cb(void *opaque) |
16 |
{ |
17 |
WorkerTestData *data = opaque; |
18 |
return __sync_fetch_and_add(&data->n, 1); |
19 |
} |
20 |
|
21 |
static int long_cb(void *opaque) |
22 |
{ |
23 |
WorkerTestData *data = opaque; |
24 |
__sync_fetch_and_add(&data->n, 1);
|
25 |
g_usleep(2000000);
|
26 |
__sync_fetch_and_add(&data->n, 1);
|
27 |
return 0; |
28 |
} |
29 |
|
30 |
static void done_cb(void *opaque, int ret) |
31 |
{ |
32 |
WorkerTestData *data = opaque; |
33 |
g_assert_cmpint(data->ret, ==, -EINPROGRESS); |
34 |
data->ret = ret; |
35 |
data->aiocb = NULL;
|
36 |
|
37 |
/* Callbacks are serialized, so no need to use atomic ops. */
|
38 |
active--; |
39 |
} |
40 |
|
41 |
/* A non-blocking poll of the main AIO context (we cannot use aio_poll
|
42 |
* because we do not know the AioContext).
|
43 |
*/
|
44 |
static void qemu_aio_wait_nonblocking(void) |
45 |
{ |
46 |
qemu_notify_event(); |
47 |
qemu_aio_wait(); |
48 |
} |
49 |
|
50 |
/* Wait until all aio and bh activity has finished */
|
51 |
static void qemu_aio_wait_all(void) |
52 |
{ |
53 |
while (qemu_aio_wait()) {
|
54 |
/* Do nothing */
|
55 |
} |
56 |
} |
57 |
|
58 |
static void test_submit(void) |
59 |
{ |
60 |
WorkerTestData data = { .n = 0 };
|
61 |
thread_pool_submit(worker_cb, &data); |
62 |
qemu_aio_wait_all(); |
63 |
g_assert_cmpint(data.n, ==, 1);
|
64 |
} |
65 |
|
66 |
static void test_submit_aio(void) |
67 |
{ |
68 |
WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
|
69 |
data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data); |
70 |
|
71 |
/* The callbacks are not called until after the first wait. */
|
72 |
active = 1;
|
73 |
g_assert_cmpint(data.ret, ==, -EINPROGRESS); |
74 |
qemu_aio_wait_all(); |
75 |
g_assert_cmpint(active, ==, 0);
|
76 |
g_assert_cmpint(data.n, ==, 1);
|
77 |
g_assert_cmpint(data.ret, ==, 0);
|
78 |
} |
79 |
|
80 |
static void co_test_cb(void *opaque) |
81 |
{ |
82 |
WorkerTestData *data = opaque; |
83 |
|
84 |
active = 1;
|
85 |
data->n = 0;
|
86 |
data->ret = -EINPROGRESS; |
87 |
thread_pool_submit_co(worker_cb, data); |
88 |
|
89 |
/* The test continues in test_submit_co, after qemu_coroutine_enter... */
|
90 |
|
91 |
g_assert_cmpint(data->n, ==, 1);
|
92 |
data->ret = 0;
|
93 |
active--; |
94 |
|
95 |
/* The test continues in test_submit_co, after qemu_aio_wait_all... */
|
96 |
} |
97 |
|
98 |
static void test_submit_co(void) |
99 |
{ |
100 |
WorkerTestData data; |
101 |
Coroutine *co = qemu_coroutine_create(co_test_cb); |
102 |
|
103 |
qemu_coroutine_enter(co, &data); |
104 |
|
105 |
/* Back here once the worker has started. */
|
106 |
|
107 |
g_assert_cmpint(active, ==, 1);
|
108 |
g_assert_cmpint(data.ret, ==, -EINPROGRESS); |
109 |
|
110 |
/* qemu_aio_wait_all will execute the rest of the coroutine. */
|
111 |
|
112 |
qemu_aio_wait_all(); |
113 |
|
114 |
/* Back here after the coroutine has finished. */
|
115 |
|
116 |
g_assert_cmpint(active, ==, 0);
|
117 |
g_assert_cmpint(data.ret, ==, 0);
|
118 |
} |
119 |
|
120 |
static void test_submit_many(void) |
121 |
{ |
122 |
WorkerTestData data[100];
|
123 |
int i;
|
124 |
|
125 |
/* Start more work items than there will be threads. */
|
126 |
for (i = 0; i < 100; i++) { |
127 |
data[i].n = 0;
|
128 |
data[i].ret = -EINPROGRESS; |
129 |
thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]); |
130 |
} |
131 |
|
132 |
active = 100;
|
133 |
while (active > 0) { |
134 |
qemu_aio_wait(); |
135 |
} |
136 |
for (i = 0; i < 100; i++) { |
137 |
g_assert_cmpint(data[i].n, ==, 1);
|
138 |
g_assert_cmpint(data[i].ret, ==, 0);
|
139 |
} |
140 |
} |
141 |
|
142 |
static void test_cancel(void) |
143 |
{ |
144 |
WorkerTestData data[100];
|
145 |
int num_canceled;
|
146 |
int i;
|
147 |
|
148 |
/* Start more work items than there will be threads, to ensure
|
149 |
* the pool is full.
|
150 |
*/
|
151 |
test_submit_many(); |
152 |
|
153 |
/* Start long running jobs, to ensure we can cancel some. */
|
154 |
for (i = 0; i < 100; i++) { |
155 |
data[i].n = 0;
|
156 |
data[i].ret = -EINPROGRESS; |
157 |
data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i], |
158 |
done_cb, &data[i]); |
159 |
} |
160 |
|
161 |
/* Starting the threads may be left to a bottom half. Let it
|
162 |
* run, but do not waste too much time...
|
163 |
*/
|
164 |
active = 100;
|
165 |
qemu_aio_wait_nonblocking(); |
166 |
|
167 |
/* Wait some time for the threads to start, with some sanity
|
168 |
* testing on the behavior of the scheduler...
|
169 |
*/
|
170 |
g_assert_cmpint(active, ==, 100);
|
171 |
g_usleep(1000000);
|
172 |
g_assert_cmpint(active, >, 50);
|
173 |
|
174 |
/* Cancel the jobs that haven't been started yet. */
|
175 |
num_canceled = 0;
|
176 |
for (i = 0; i < 100; i++) { |
177 |
if (__sync_val_compare_and_swap(&data[i].n, 0, 3) == 0) { |
178 |
data[i].ret = -ECANCELED; |
179 |
bdrv_aio_cancel(data[i].aiocb); |
180 |
active--; |
181 |
num_canceled++; |
182 |
} |
183 |
} |
184 |
g_assert_cmpint(active, >, 0);
|
185 |
g_assert_cmpint(num_canceled, <, 100);
|
186 |
|
187 |
/* Canceling the others will be a blocking operation. */
|
188 |
for (i = 0; i < 100; i++) { |
189 |
if (data[i].n != 3) { |
190 |
bdrv_aio_cancel(data[i].aiocb); |
191 |
} |
192 |
} |
193 |
|
194 |
/* Finish execution and execute any remaining callbacks. */
|
195 |
qemu_aio_wait_all(); |
196 |
g_assert_cmpint(active, ==, 0);
|
197 |
for (i = 0; i < 100; i++) { |
198 |
if (data[i].n == 3) { |
199 |
g_assert_cmpint(data[i].ret, ==, -ECANCELED); |
200 |
g_assert(data[i].aiocb != NULL);
|
201 |
} else {
|
202 |
g_assert_cmpint(data[i].n, ==, 2);
|
203 |
g_assert_cmpint(data[i].ret, ==, 0);
|
204 |
g_assert(data[i].aiocb == NULL);
|
205 |
} |
206 |
} |
207 |
} |
208 |
|
209 |
int main(int argc, char **argv) |
210 |
{ |
211 |
/* These should be removed once each AioContext has its thread pool.
|
212 |
* The test should create its own AioContext.
|
213 |
*/
|
214 |
qemu_init_main_loop(); |
215 |
bdrv_init(); |
216 |
|
217 |
g_test_init(&argc, &argv, NULL);
|
218 |
g_test_add_func("/thread-pool/submit", test_submit);
|
219 |
g_test_add_func("/thread-pool/submit-aio", test_submit_aio);
|
220 |
g_test_add_func("/thread-pool/submit-co", test_submit_co);
|
221 |
g_test_add_func("/thread-pool/submit-many", test_submit_many);
|
222 |
g_test_add_func("/thread-pool/cancel", test_cancel);
|
223 |
return g_test_run();
|
224 |
} |