Revision c4d9d196

b/block/raw-posix.c
750 750
        BlockDriverCompletionFunc *cb, void *opaque, int type)
751 751
{
752 752
    RawPosixAIOData *acb = g_slice_new(RawPosixAIOData);
753
    ThreadPool *pool;
753 754

  
754 755
    acb->bs = bs;
755 756
    acb->aio_type = type;
......
763 764
    acb->aio_offset = sector_num * 512;
764 765

  
765 766
    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
766
    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
767
    pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
768
    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
767 769
}
768 770

  
769 771
static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs,
......
1413 1415
{
1414 1416
    BDRVRawState *s = bs->opaque;
1415 1417
    RawPosixAIOData *acb;
1418
    ThreadPool *pool;
1416 1419

  
1417 1420
    if (fd_open(bs) < 0)
1418 1421
        return NULL;
......
1424 1427
    acb->aio_offset = 0;
1425 1428
    acb->aio_ioctl_buf = buf;
1426 1429
    acb->aio_ioctl_cmd = req;
1427
    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
1430
    pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
1431
    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
1428 1432
}
1429 1433

  
1430 1434
#elif defined(__FreeBSD__) || defined(__FreeBSD_kernel__)
b/block/raw-win32.c
144 144
        BlockDriverCompletionFunc *cb, void *opaque, int type)
145 145
{
146 146
    RawWin32AIOData *acb = g_slice_new(RawWin32AIOData);
147
    ThreadPool *pool;
147 148

  
148 149
    acb->bs = bs;
149 150
    acb->hfile = hfile;
......
157 158
    acb->aio_offset = sector_num * 512;
158 159

  
159 160
    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
160
    return thread_pool_submit_aio(aio_worker, acb, cb, opaque);
161
    pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
162
    return thread_pool_submit_aio(pool, aio_worker, acb, cb, opaque);
161 163
}
162 164

  
163 165
int qemu_ftruncate64(int fd, int64_t length)
b/include/block/thread-pool.h
31 31
ThreadPool *thread_pool_new(struct AioContext *ctx);
32 32
void thread_pool_free(ThreadPool *pool);
33 33

  
34
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
35
     BlockDriverCompletionFunc *cb, void *opaque);
36
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
37
void thread_pool_submit(ThreadPoolFunc *func, void *arg);
34
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
35
        ThreadPoolFunc *func, void *arg,
36
        BlockDriverCompletionFunc *cb, void *opaque);
37
int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
38
        ThreadPoolFunc *func, void *arg);
39
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
38 40

  
39 41
#endif
b/tests/test-thread-pool.c
4 4
#include "block/thread-pool.h"
5 5
#include "block/block.h"
6 6

  
7
static AioContext *ctx;
8
static ThreadPool *pool;
7 9
static int active;
8 10

  
9 11
typedef struct {
......
38 40
    active--;
39 41
}
40 42

  
41
/* A non-blocking poll of the main AIO context (we cannot use aio_poll
42
 * because we do not know the AioContext).
43
 */
44
static void qemu_aio_wait_nonblocking(void)
45
{
46
    qemu_notify_event();
47
    qemu_aio_wait();
48
}
49

  
50 43
/* Wait until all aio and bh activity has finished */
51 44
static void qemu_aio_wait_all(void)
52 45
{
53
    while (qemu_aio_wait()) {
46
    while (aio_poll(ctx, true)) {
54 47
        /* Do nothing */
55 48
    }
56 49
}
......
58 51
static void test_submit(void)
59 52
{
60 53
    WorkerTestData data = { .n = 0 };
61
    thread_pool_submit(worker_cb, &data);
54
    thread_pool_submit(pool, worker_cb, &data);
62 55
    qemu_aio_wait_all();
63 56
    g_assert_cmpint(data.n, ==, 1);
64 57
}
......
66 59
static void test_submit_aio(void)
67 60
{
68 61
    WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
69
    data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data);
62
    data.aiocb = thread_pool_submit_aio(pool, worker_cb, &data,
63
                                        done_cb, &data);
70 64

  
71 65
    /* The callbacks are not called until after the first wait.  */
72 66
    active = 1;
......
84 78
    active = 1;
85 79
    data->n = 0;
86 80
    data->ret = -EINPROGRESS;
87
    thread_pool_submit_co(worker_cb, data);
81
    thread_pool_submit_co(pool, worker_cb, data);
88 82

  
89 83
    /* The test continues in test_submit_co, after qemu_coroutine_enter... */
90 84

  
......
126 120
    for (i = 0; i < 100; i++) {
127 121
        data[i].n = 0;
128 122
        data[i].ret = -EINPROGRESS;
129
        thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]);
123
        thread_pool_submit_aio(pool, worker_cb, &data[i], done_cb, &data[i]);
130 124
    }
131 125

  
132 126
    active = 100;
133 127
    while (active > 0) {
134
        qemu_aio_wait();
128
        aio_poll(ctx, true);
135 129
    }
136 130
    for (i = 0; i < 100; i++) {
137 131
        g_assert_cmpint(data[i].n, ==, 1);
......
154 148
    for (i = 0; i < 100; i++) {
155 149
        data[i].n = 0;
156 150
        data[i].ret = -EINPROGRESS;
157
        data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i],
151
        data[i].aiocb = thread_pool_submit_aio(pool, long_cb, &data[i],
158 152
                                               done_cb, &data[i]);
159 153
    }
160 154

  
......
162 156
     * run, but do not waste too much time...
163 157
     */
164 158
    active = 100;
165
    qemu_aio_wait_nonblocking();
159
    aio_notify(ctx);
160
    aio_poll(ctx, false);
166 161

  
167 162
    /* Wait some time for the threads to start, with some sanity
168 163
     * testing on the behavior of the scheduler...
......
208 203

  
209 204
int main(int argc, char **argv)
210 205
{
211
    /* These should be removed once each AioContext has its thread pool.
212
     * The test should create its own AioContext.
213
     */
214
    qemu_init_main_loop();
215
    bdrv_init();
206
    int ret;
207

  
208
    ctx = aio_context_new();
209
    pool = aio_get_thread_pool(ctx);
216 210

  
217 211
    g_test_init(&argc, &argv, NULL);
218 212
    g_test_add_func("/thread-pool/submit", test_submit);
......
220 214
    g_test_add_func("/thread-pool/submit-co", test_submit_co);
221 215
    g_test_add_func("/thread-pool/submit-many", test_submit_many);
222 216
    g_test_add_func("/thread-pool/cancel", test_cancel);
223
    return g_test_run();
217

  
218
    ret = g_test_run();
219

  
220
    aio_context_unref(ctx);
221
    return ret;
224 222
}
b/thread-pool.c
78 78
    bool stopping;
79 79
};
80 80

  
81
/* Currently there is only one thread pool instance. */
82
static ThreadPool global_pool;
83

  
84 81
static void *worker_thread(void *opaque)
85 82
{
86 83
    ThreadPool *pool = opaque;
......
239 236
    .cancel             = thread_pool_cancel,
240 237
};
241 238

  
242
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
239
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
240
        ThreadPoolFunc *func, void *arg,
243 241
        BlockDriverCompletionFunc *cb, void *opaque)
244 242
{
245
    ThreadPool *pool = &global_pool;
246 243
    ThreadPoolElement *req;
247 244

  
248 245
    req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
......
278 275
    qemu_coroutine_enter(co->co, NULL);
279 276
}
280 277

  
281
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg)
278
int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
279
                                       void *arg)
282 280
{
283 281
    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
284 282
    assert(qemu_in_coroutine());
285
    thread_pool_submit_aio(func, arg, thread_pool_co_cb, &tpc);
283
    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
286 284
    qemu_coroutine_yield();
287 285
    return tpc.ret;
288 286
}
289 287

  
290
void thread_pool_submit(ThreadPoolFunc *func, void *arg)
288
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
291 289
{
292
    thread_pool_submit_aio(func, arg, NULL, NULL);
290
    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
293 291
}
294 292

  
295 293
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
......
354 352
    event_notifier_cleanup(&pool->notifier);
355 353
    g_free(pool);
356 354
}
357

  
358
static void thread_pool_init(void)
359
{
360
    thread_pool_init_one(&global_pool, NULL);
361
}
362

  
363
block_init(thread_pool_init)

Also available in: Unified diff