Statistics
| Branch: | Revision:

root / tests / test-thread-pool.c @ 17862378

History | View | Annotate | Download (5.6 kB)

1
#include <glib.h>
2
#include "qemu-common.h"
3
#include "block/aio.h"
4
#include "block/thread-pool.h"
5
#include "block/block.h"
6

    
7
static int active;
8

    
9
typedef struct {
10
    BlockDriverAIOCB *aiocb;
11
    int n;
12
    int ret;
13
} WorkerTestData;
14

    
15
static int worker_cb(void *opaque)
16
{
17
    WorkerTestData *data = opaque;
18
    return __sync_fetch_and_add(&data->n, 1);
19
}
20

    
21
static int long_cb(void *opaque)
22
{
23
    WorkerTestData *data = opaque;
24
    __sync_fetch_and_add(&data->n, 1);
25
    g_usleep(2000000);
26
    __sync_fetch_and_add(&data->n, 1);
27
    return 0;
28
}
29

    
30
static void done_cb(void *opaque, int ret)
31
{
32
    WorkerTestData *data = opaque;
33
    g_assert_cmpint(data->ret, ==, -EINPROGRESS);
34
    data->ret = ret;
35
    data->aiocb = NULL;
36

    
37
    /* Callbacks are serialized, so no need to use atomic ops.  */
38
    active--;
39
}
40

    
41
/* A non-blocking poll of the main AIO context (we cannot use aio_poll
42
 * because we do not know the AioContext).
43
 */
44
static void qemu_aio_wait_nonblocking(void)
45
{
46
    qemu_notify_event();
47
    qemu_aio_wait();
48
}
49

    
50
/* Wait until all aio and bh activity has finished */
51
static void qemu_aio_wait_all(void)
52
{
53
    while (qemu_aio_wait()) {
54
        /* Do nothing */
55
    }
56
}
57

    
58
static void test_submit(void)
59
{
60
    WorkerTestData data = { .n = 0 };
61
    thread_pool_submit(worker_cb, &data);
62
    qemu_aio_wait_all();
63
    g_assert_cmpint(data.n, ==, 1);
64
}
65

    
66
static void test_submit_aio(void)
67
{
68
    WorkerTestData data = { .n = 0, .ret = -EINPROGRESS };
69
    data.aiocb = thread_pool_submit_aio(worker_cb, &data, done_cb, &data);
70

    
71
    /* The callbacks are not called until after the first wait.  */
72
    active = 1;
73
    g_assert_cmpint(data.ret, ==, -EINPROGRESS);
74
    qemu_aio_wait_all();
75
    g_assert_cmpint(active, ==, 0);
76
    g_assert_cmpint(data.n, ==, 1);
77
    g_assert_cmpint(data.ret, ==, 0);
78
}
79

    
80
static void co_test_cb(void *opaque)
81
{
82
    WorkerTestData *data = opaque;
83

    
84
    active = 1;
85
    data->n = 0;
86
    data->ret = -EINPROGRESS;
87
    thread_pool_submit_co(worker_cb, data);
88

    
89
    /* The test continues in test_submit_co, after qemu_coroutine_enter... */
90

    
91
    g_assert_cmpint(data->n, ==, 1);
92
    data->ret = 0;
93
    active--;
94

    
95
    /* The test continues in test_submit_co, after qemu_aio_wait_all... */
96
}
97

    
98
static void test_submit_co(void)
99
{
100
    WorkerTestData data;
101
    Coroutine *co = qemu_coroutine_create(co_test_cb);
102

    
103
    qemu_coroutine_enter(co, &data);
104

    
105
    /* Back here once the worker has started.  */
106

    
107
    g_assert_cmpint(active, ==, 1);
108
    g_assert_cmpint(data.ret, ==, -EINPROGRESS);
109

    
110
    /* qemu_aio_wait_all will execute the rest of the coroutine.  */
111

    
112
    qemu_aio_wait_all();
113

    
114
    /* Back here after the coroutine has finished.  */
115

    
116
    g_assert_cmpint(active, ==, 0);
117
    g_assert_cmpint(data.ret, ==, 0);
118
}
119

    
120
static void test_submit_many(void)
121
{
122
    WorkerTestData data[100];
123
    int i;
124

    
125
    /* Start more work items than there will be threads.  */
126
    for (i = 0; i < 100; i++) {
127
        data[i].n = 0;
128
        data[i].ret = -EINPROGRESS;
129
        thread_pool_submit_aio(worker_cb, &data[i], done_cb, &data[i]);
130
    }
131

    
132
    active = 100;
133
    while (active > 0) {
134
        qemu_aio_wait();
135
    }
136
    for (i = 0; i < 100; i++) {
137
        g_assert_cmpint(data[i].n, ==, 1);
138
        g_assert_cmpint(data[i].ret, ==, 0);
139
    }
140
}
141

    
142
static void test_cancel(void)
143
{
144
    WorkerTestData data[100];
145
    int num_canceled;
146
    int i;
147

    
148
    /* Start more work items than there will be threads, to ensure
149
     * the pool is full.
150
     */
151
    test_submit_many();
152

    
153
    /* Start long running jobs, to ensure we can cancel some.  */
154
    for (i = 0; i < 100; i++) {
155
        data[i].n = 0;
156
        data[i].ret = -EINPROGRESS;
157
        data[i].aiocb = thread_pool_submit_aio(long_cb, &data[i],
158
                                               done_cb, &data[i]);
159
    }
160

    
161
    /* Starting the threads may be left to a bottom half.  Let it
162
     * run, but do not waste too much time...
163
     */
164
    active = 100;
165
    qemu_aio_wait_nonblocking();
166

    
167
    /* Wait some time for the threads to start, with some sanity
168
     * testing on the behavior of the scheduler...
169
     */
170
    g_assert_cmpint(active, ==, 100);
171
    g_usleep(1000000);
172
    g_assert_cmpint(active, >, 50);
173

    
174
    /* Cancel the jobs that haven't been started yet.  */
175
    num_canceled = 0;
176
    for (i = 0; i < 100; i++) {
177
        if (__sync_val_compare_and_swap(&data[i].n, 0, 3) == 0) {
178
            data[i].ret = -ECANCELED;
179
            bdrv_aio_cancel(data[i].aiocb);
180
            active--;
181
            num_canceled++;
182
        }
183
    }
184
    g_assert_cmpint(active, >, 0);
185
    g_assert_cmpint(num_canceled, <, 100);
186

    
187
    /* Canceling the others will be a blocking operation.  */
188
    for (i = 0; i < 100; i++) {
189
        if (data[i].n != 3) {
190
            bdrv_aio_cancel(data[i].aiocb);
191
        }
192
    }
193

    
194
    /* Finish execution and execute any remaining callbacks.  */
195
    qemu_aio_wait_all();
196
    g_assert_cmpint(active, ==, 0);
197
    for (i = 0; i < 100; i++) {
198
        if (data[i].n == 3) {
199
            g_assert_cmpint(data[i].ret, ==, -ECANCELED);
200
            g_assert(data[i].aiocb != NULL);
201
        } else {
202
            g_assert_cmpint(data[i].n, ==, 2);
203
            g_assert_cmpint(data[i].ret, ==, 0);
204
            g_assert(data[i].aiocb == NULL);
205
        }
206
    }
207
}
208

    
209
int main(int argc, char **argv)
210
{
211
    /* These should be removed once each AioContext has its thread pool.
212
     * The test should create its own AioContext.
213
     */
214
    qemu_init_main_loop();
215
    bdrv_init();
216

    
217
    g_test_init(&argc, &argv, NULL);
218
    g_test_add_func("/thread-pool/submit", test_submit);
219
    g_test_add_func("/thread-pool/submit-aio", test_submit_aio);
220
    g_test_add_func("/thread-pool/submit-co", test_submit_co);
221
    g_test_add_func("/thread-pool/submit-many", test_submit_many);
222
    g_test_add_func("/thread-pool/cancel", test_cancel);
223
    return g_test_run();
224
}