Revision c4d9d196
b/block/raw-posix.c | ||
---|---|---|
750 | 750 |
BlockDriverCompletionFunc *cb, void *opaque, int type) |
751 | 751 |
{ |
752 | 752 |
RawPosixAIOData *acb = g_slice_new(RawPosixAIOData); |
753 |
ThreadPool *pool; |
|
753 | 754 |
|
754 | 755 |
acb->bs = bs; |
755 | 756 |
acb->aio_type = type; |
... | ... | |
763 | 764 |
acb->aio_offset = sector_num * 512; |
764 | 765 |
|
765 | 766 |
trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); |
766 |
return thread_pool_submit_aio(aio_worker, acb, cb, opaque); |
|
767 |
pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); |
|
768 |
return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); |
|
767 | 769 |
} |
768 | 770 |
|
769 | 771 |
static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs, |
... | ... | |
1413 | 1415 |
{ |
1414 | 1416 |
BDRVRawState *s = bs->opaque; |
1415 | 1417 |
RawPosixAIOData *acb; |
1418 |
ThreadPool *pool; |
|
1416 | 1419 |
|
1417 | 1420 |
if (fd_open(bs) < 0) |
1418 | 1421 |
return NULL; |
... | ... | |
1424 | 1427 |
acb->aio_offset = 0; |
1425 | 1428 |
acb->aio_ioctl_buf = buf; |
1426 | 1429 |
acb->aio_ioctl_cmd = req; |
1427 |
return thread_pool_submit_aio(aio_worker, acb, cb, opaque); |
|
1430 |
pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); |
|
1431 |
return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); |
|
1428 | 1432 |
} |
1429 | 1433 |
|
1430 | 1434 |
#elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__) |
b/block/raw-win32.c | ||
---|---|---|
144 | 144 |
BlockDriverCompletionFunc *cb, void *opaque, int type) |
145 | 145 |
{ |
146 | 146 |
RawWin32AIOData *acb = g_slice_new(RawWin32AIOData); |
147 |
ThreadPool *pool; |
|
147 | 148 |
|
148 | 149 |
acb->bs = bs; |
149 | 150 |
acb->hfile = hfile; |
... | ... | |
157 | 158 |
acb->aio_offset = sector_num * 512; |
158 | 159 |
|
159 | 160 |
trace_paio_submit(acb, opaque, sector_num, nb_sectors, type); |
160 |
return thread_pool_submit_aio(aio_worker, acb, cb, opaque); |
|
161 |
pool = aio_get_thread_pool(bdrv_get_aio_context(bs)); |
|
162 |
return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque); |
|
161 | 163 |
} |
162 | 164 |
|
163 | 165 |
int qemu_ftruncate64(int fd, int64_t length) |
b/include/block/thread-pool.h | ||
---|---|---|
31 | 31 |
ThreadPool *thread_pool_new(struct AioContext *ctx); |
32 | 32 |
void thread_pool_free(ThreadPool *pool); |
33 | 33 |
|
34 |
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, |
|
35 |
BlockDriverCompletionFunc *cb, void *opaque); |
|
36 |
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg); |
|
37 |
void thread_pool_submit(ThreadPoolFunc *func, void *arg); |
|
34 |
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, |
|
35 |
ThreadPoolFunc *func, void *arg, |
|
36 |
BlockDriverCompletionFunc *cb, void *opaque); |
|
37 |
int coroutine_fn thread_pool_submit_co(ThreadPool *pool, |
|
38 |
ThreadPoolFunc *func, void *arg); |
|
39 |
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg); |
|
38 | 40 |
|
39 | 41 |
#endif |
b/tests/test-thread-pool.c | ||
---|---|---|
4 | 4 |
#include "block/thread-pool.h" |
5 | 5 |
#include "block/block.h" |
6 | 6 |
|
7 |
static AioContext *ctx; |
|
8 |
static ThreadPool *pool; |
|
7 | 9 |
static int active; |
8 | 10 |
|
9 | 11 |
typedef struct { |
... | ... | |
38 | 40 |
active--; |
39 | 41 |
} |
40 | 42 |
|
41 |
/* A non-blocking poll of the main AIO context (we cannot use aio_poll |
|
42 |
* because we do not know the AioContext). |
|
43 |
*/ |
|
44 |
static void qemu_aio_wait_nonblocking(void) |
|
45 |
{ |
|
46 |
qemu_notify_event(); |
|
47 |
qemu_aio_wait(); |
|
48 |
} |
|
49 |
|
|
50 | 43 |
/* Wait until all aio and bh activity has finished */ |
51 | 44 |
static void qemu_aio_wait_all(void) |
52 | 45 |
{ |
53 |
while (qemu_aio_wait()) {
|
|
46 |
while (aio_poll(ctx, true)) {
|
|
54 | 47 |
/* Do nothing */ |
55 | 48 |
} |
56 | 49 |
} |
... | ... | |
58 | 51 |
static void test_submit(void) |
59 | 52 |
{ |
60 | 53 |
WorkerTestData data = { .n = 0 }; |
61 |
thread_pool_submit(worker_cb, &data); |
|
54 |
thread_pool_submit(pool, worker_cb, &data);
|
|
62 | 55 |
qemu_aio_wait_all(); |
63 | 56 |
g_assert_cmpint(data.n, ==, 1); |
64 | 57 |
} |
... | ... | |
66 | 59 |
static void test_submit_aio(void) |
67 | 60 |
{ |
68 | 61 |
WorkerTestData data = { .n = 0, .ret = -EINPROGRESS }; |
69 |
data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data); |
|
62 |
data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data, |
|
63 |
done_cb, &data); |
|
70 | 64 |
|
71 | 65 |
/* The callbacks are not called until after the first wait. */ |
72 | 66 |
active = 1; |
... | ... | |
84 | 78 |
active = 1; |
85 | 79 |
data->n = 0; |
86 | 80 |
data->ret = -EINPROGRESS; |
87 |
thread_pool_submit_co(worker_cb, data); |
|
81 |
thread_pool_submit_co(pool, worker_cb, data);
|
|
88 | 82 |
|
89 | 83 |
/* The test continues in test_submit_co, after qemu_coroutine_enter... */ |
90 | 84 |
|
... | ... | |
126 | 120 |
for (i = 0; i < 100; i++) { |
127 | 121 |
data[i].n = 0; |
128 | 122 |
data[i].ret = -EINPROGRESS; |
129 |
thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]); |
|
123 |
thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
|
|
130 | 124 |
} |
131 | 125 |
|
132 | 126 |
active = 100; |
133 | 127 |
while (active > 0) { |
134 |
qemu_aio_wait();
|
|
128 |
aio_poll(ctx, true);
|
|
135 | 129 |
} |
136 | 130 |
for (i = 0; i < 100; i++) { |
137 | 131 |
g_assert_cmpint(data[i].n, ==, 1); |
... | ... | |
154 | 148 |
for (i = 0; i < 100; i++) { |
155 | 149 |
data[i].n = 0; |
156 | 150 |
data[i].ret = -EINPROGRESS; |
157 |
data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i], |
|
151 |
data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
|
|
158 | 152 |
done_cb, &data[i]); |
159 | 153 |
} |
160 | 154 |
|
... | ... | |
162 | 156 |
* run, but do not waste too much time... |
163 | 157 |
*/ |
164 | 158 |
active = 100; |
165 |
qemu_aio_wait_nonblocking(); |
|
159 |
aio_notify(ctx); |
|
160 |
aio_poll(ctx, false); |
|
166 | 161 |
|
167 | 162 |
/* Wait some time for the threads to start, with some sanity |
168 | 163 |
* testing on the behavior of the scheduler... |
... | ... | |
208 | 203 |
|
209 | 204 |
int main(int argc, char **argv) |
210 | 205 |
{ |
211 |
/* These should be removed once each AioContext has its thread pool. |
|
212 |
* The test should create its own AioContext. |
|
213 |
*/ |
|
214 |
qemu_init_main_loop(); |
|
215 |
bdrv_init(); |
|
206 |
int ret; |
|
207 |
|
|
208 |
ctx = aio_context_new(); |
|
209 |
pool = aio_get_thread_pool(ctx); |
|
216 | 210 |
|
217 | 211 |
g_test_init(&argc, &argv, NULL); |
218 | 212 |
g_test_add_func("/thread-pool/submit", test_submit); |
... | ... | |
220 | 214 |
g_test_add_func("/thread-pool/submit-co", test_submit_co); |
221 | 215 |
g_test_add_func("/thread-pool/submit-many", test_submit_many); |
222 | 216 |
g_test_add_func("/thread-pool/cancel", test_cancel); |
223 |
return g_test_run(); |
|
217 |
|
|
218 |
ret = g_test_run(); |
|
219 |
|
|
220 |
aio_context_unref(ctx); |
|
221 |
return ret; |
|
224 | 222 |
} |
b/thread-pool.c | ||
---|---|---|
78 | 78 |
bool stopping; |
79 | 79 |
}; |
80 | 80 |
|
81 |
/* Currently there is only one thread pool instance. */ |
|
82 |
static ThreadPool global_pool; |
|
83 |
|
|
84 | 81 |
static void *worker_thread(void *opaque) |
85 | 82 |
{ |
86 | 83 |
ThreadPool *pool = opaque; |
... | ... | |
239 | 236 |
.cancel = thread_pool_cancel, |
240 | 237 |
}; |
241 | 238 |
|
242 |
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg, |
|
239 |
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool, |
|
240 |
ThreadPoolFunc *func, void *arg, |
|
243 | 241 |
BlockDriverCompletionFunc *cb, void *opaque) |
244 | 242 |
{ |
245 |
ThreadPool *pool = &global_pool; |
|
246 | 243 |
ThreadPoolElement *req; |
247 | 244 |
|
248 | 245 |
req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque); |
... | ... | |
278 | 275 |
qemu_coroutine_enter(co->co, NULL); |
279 | 276 |
} |
280 | 277 |
|
281 |
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg) |
|
278 |
int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func, |
|
279 |
void *arg) |
|
282 | 280 |
{ |
283 | 281 |
ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS }; |
284 | 282 |
assert(qemu_in_coroutine()); |
285 |
thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc); |
|
283 |
thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
|
|
286 | 284 |
qemu_coroutine_yield(); |
287 | 285 |
return tpc.ret; |
288 | 286 |
} |
289 | 287 |
|
290 |
void thread_pool_submit(ThreadPoolFunc *func, void *arg) |
|
288 |
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
|
|
291 | 289 |
{ |
292 |
thread_pool_submit_aio(func, arg, NULL, NULL); |
|
290 |
thread_pool_submit_aio(pool, func, arg, NULL, NULL);
|
|
293 | 291 |
} |
294 | 292 |
|
295 | 293 |
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx) |
... | ... | |
354 | 352 |
event_notifier_cleanup(&pool->notifier); |
355 | 353 |
g_free(pool); |
356 | 354 |
} |
357 |
|
|
358 |
static void thread_pool_init(void) |
|
359 |
{ |
|
360 |
thread_pool_init_one(&global_pool, NULL); |
|
361 |
} |
|
362 |
|
|
363 |
block_init(thread_pool_init) |
Also available in: Unified diff