Statistics
| Branch: | Revision:

root / thread-pool.c @ feature-archipelago

History | View | Annotate | Download (9.5 kB)

1 d354c7ec Paolo Bonzini
/*
2 d354c7ec Paolo Bonzini
 * QEMU block layer thread pool
3 d354c7ec Paolo Bonzini
 *
4 d354c7ec Paolo Bonzini
 * Copyright IBM, Corp. 2008
5 d354c7ec Paolo Bonzini
 * Copyright Red Hat, Inc. 2012
6 d354c7ec Paolo Bonzini
 *
7 d354c7ec Paolo Bonzini
 * Authors:
8 d354c7ec Paolo Bonzini
 *  Anthony Liguori   <aliguori@us.ibm.com>
9 d354c7ec Paolo Bonzini
 *  Paolo Bonzini     <pbonzini@redhat.com>
10 d354c7ec Paolo Bonzini
 *
11 d354c7ec Paolo Bonzini
 * This work is licensed under the terms of the GNU GPL, version 2.  See
12 d354c7ec Paolo Bonzini
 * the COPYING file in the top-level directory.
13 d354c7ec Paolo Bonzini
 *
14 d354c7ec Paolo Bonzini
 * Contributions after 2012-01-13 are licensed under the terms of the
15 d354c7ec Paolo Bonzini
 * GNU GPL, version 2 or (at your option) any later version.
16 d354c7ec Paolo Bonzini
 */
17 d354c7ec Paolo Bonzini
#include "qemu-common.h"
18 1de7afc9 Paolo Bonzini
#include "qemu/queue.h"
19 1de7afc9 Paolo Bonzini
#include "qemu/thread.h"
20 1de7afc9 Paolo Bonzini
#include "qemu/osdep.h"
21 737e150e Paolo Bonzini
#include "block/coroutine.h"
22 d354c7ec Paolo Bonzini
#include "trace.h"
23 737e150e Paolo Bonzini
#include "block/block_int.h"
24 1de7afc9 Paolo Bonzini
#include "qemu/event_notifier.h"
25 737e150e Paolo Bonzini
#include "block/thread-pool.h"
26 6a1751b7 Alex Bligh
#include "qemu/main-loop.h"
27 d354c7ec Paolo Bonzini
28 b811203c Stefan Hajnoczi
static void do_spawn_thread(ThreadPool *pool);
29 d354c7ec Paolo Bonzini
30 d354c7ec Paolo Bonzini
typedef struct ThreadPoolElement ThreadPoolElement;
31 d354c7ec Paolo Bonzini
32 d354c7ec Paolo Bonzini
enum ThreadState {
33 d354c7ec Paolo Bonzini
    THREAD_QUEUED,
34 d354c7ec Paolo Bonzini
    THREAD_ACTIVE,
35 d354c7ec Paolo Bonzini
    THREAD_DONE,
36 d354c7ec Paolo Bonzini
    THREAD_CANCELED,
37 d354c7ec Paolo Bonzini
};
38 d354c7ec Paolo Bonzini
39 d354c7ec Paolo Bonzini
struct ThreadPoolElement {
40 d354c7ec Paolo Bonzini
    BlockDriverAIOCB common;
41 b811203c Stefan Hajnoczi
    ThreadPool *pool;
42 d354c7ec Paolo Bonzini
    ThreadPoolFunc *func;
43 d354c7ec Paolo Bonzini
    void *arg;
44 19d092cf Paolo Bonzini
45 19d092cf Paolo Bonzini
    /* Moving state out of THREAD_QUEUED is protected by lock.  After
46 19d092cf Paolo Bonzini
     * that, only the worker thread can write to it.  Reads and writes
47 19d092cf Paolo Bonzini
     * of state and ret are ordered with memory barriers.
48 19d092cf Paolo Bonzini
     */
49 d354c7ec Paolo Bonzini
    enum ThreadState state;
50 d354c7ec Paolo Bonzini
    int ret;
51 d354c7ec Paolo Bonzini
52 d354c7ec Paolo Bonzini
    /* Access to this list is protected by lock.  */
53 d354c7ec Paolo Bonzini
    QTAILQ_ENTRY(ThreadPoolElement) reqs;
54 d354c7ec Paolo Bonzini
55 d354c7ec Paolo Bonzini
    /* Access to this list is protected by the global mutex.  */
56 d354c7ec Paolo Bonzini
    QLIST_ENTRY(ThreadPoolElement) all;
57 d354c7ec Paolo Bonzini
};
58 d354c7ec Paolo Bonzini
59 b811203c Stefan Hajnoczi
struct ThreadPool {
60 b811203c Stefan Hajnoczi
    EventNotifier notifier;
61 f7311ccc Stefan Hajnoczi
    AioContext *ctx;
62 b811203c Stefan Hajnoczi
    QemuMutex lock;
63 b811203c Stefan Hajnoczi
    QemuCond check_cancel;
64 f7311ccc Stefan Hajnoczi
    QemuCond worker_stopped;
65 b811203c Stefan Hajnoczi
    QemuSemaphore sem;
66 b811203c Stefan Hajnoczi
    int max_threads;
67 b811203c Stefan Hajnoczi
    QEMUBH *new_thread_bh;
68 b811203c Stefan Hajnoczi
69 b811203c Stefan Hajnoczi
    /* The following variables are only accessed from one AioContext. */
70 b811203c Stefan Hajnoczi
    QLIST_HEAD(, ThreadPoolElement) head;
71 b811203c Stefan Hajnoczi
72 b811203c Stefan Hajnoczi
    /* The following variables are protected by lock.  */
73 b811203c Stefan Hajnoczi
    QTAILQ_HEAD(, ThreadPoolElement) request_list;
74 b811203c Stefan Hajnoczi
    int cur_threads;
75 b811203c Stefan Hajnoczi
    int idle_threads;
76 b811203c Stefan Hajnoczi
    int new_threads;     /* backlog of threads we need to create */
77 b811203c Stefan Hajnoczi
    int pending_threads; /* threads created but not running yet */
78 b811203c Stefan Hajnoczi
    int pending_cancellations; /* whether we need a cond_broadcast */
79 f7311ccc Stefan Hajnoczi
    bool stopping;
80 b811203c Stefan Hajnoczi
};
81 b811203c Stefan Hajnoczi
82 b811203c Stefan Hajnoczi
static void *worker_thread(void *opaque)
83 d354c7ec Paolo Bonzini
{
84 b811203c Stefan Hajnoczi
    ThreadPool *pool = opaque;
85 b811203c Stefan Hajnoczi
86 b811203c Stefan Hajnoczi
    qemu_mutex_lock(&pool->lock);
87 b811203c Stefan Hajnoczi
    pool->pending_threads--;
88 b811203c Stefan Hajnoczi
    do_spawn_thread(pool);
89 d354c7ec Paolo Bonzini
90 f7311ccc Stefan Hajnoczi
    while (!pool->stopping) {
91 d354c7ec Paolo Bonzini
        ThreadPoolElement *req;
92 d354c7ec Paolo Bonzini
        int ret;
93 d354c7ec Paolo Bonzini
94 d354c7ec Paolo Bonzini
        do {
95 b811203c Stefan Hajnoczi
            pool->idle_threads++;
96 b811203c Stefan Hajnoczi
            qemu_mutex_unlock(&pool->lock);
97 b811203c Stefan Hajnoczi
            ret = qemu_sem_timedwait(&pool->sem, 10000);
98 b811203c Stefan Hajnoczi
            qemu_mutex_lock(&pool->lock);
99 b811203c Stefan Hajnoczi
            pool->idle_threads--;
100 b811203c Stefan Hajnoczi
        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
101 f7311ccc Stefan Hajnoczi
        if (ret == -1 || pool->stopping) {
102 d354c7ec Paolo Bonzini
            break;
103 d354c7ec Paolo Bonzini
        }
104 d354c7ec Paolo Bonzini
105 b811203c Stefan Hajnoczi
        req = QTAILQ_FIRST(&pool->request_list);
106 b811203c Stefan Hajnoczi
        QTAILQ_REMOVE(&pool->request_list, req, reqs);
107 d354c7ec Paolo Bonzini
        req->state = THREAD_ACTIVE;
108 b811203c Stefan Hajnoczi
        qemu_mutex_unlock(&pool->lock);
109 d354c7ec Paolo Bonzini
110 d354c7ec Paolo Bonzini
        ret = req->func(req->arg);
111 d354c7ec Paolo Bonzini
112 d354c7ec Paolo Bonzini
        req->ret = ret;
113 19d092cf Paolo Bonzini
        /* Write ret before state.  */
114 19d092cf Paolo Bonzini
        smp_wmb();
115 19d092cf Paolo Bonzini
        req->state = THREAD_DONE;
116 19d092cf Paolo Bonzini
117 b811203c Stefan Hajnoczi
        qemu_mutex_lock(&pool->lock);
118 b811203c Stefan Hajnoczi
        if (pool->pending_cancellations) {
119 b811203c Stefan Hajnoczi
            qemu_cond_broadcast(&pool->check_cancel);
120 d354c7ec Paolo Bonzini
        }
121 d354c7ec Paolo Bonzini
122 b811203c Stefan Hajnoczi
        event_notifier_set(&pool->notifier);
123 d354c7ec Paolo Bonzini
    }
124 d354c7ec Paolo Bonzini
125 b811203c Stefan Hajnoczi
    pool->cur_threads--;
126 f7311ccc Stefan Hajnoczi
    qemu_cond_signal(&pool->worker_stopped);
127 b811203c Stefan Hajnoczi
    qemu_mutex_unlock(&pool->lock);
128 d354c7ec Paolo Bonzini
    return NULL;
129 d354c7ec Paolo Bonzini
}
130 d354c7ec Paolo Bonzini
131 b811203c Stefan Hajnoczi
static void do_spawn_thread(ThreadPool *pool)
132 d354c7ec Paolo Bonzini
{
133 d354c7ec Paolo Bonzini
    QemuThread t;
134 d354c7ec Paolo Bonzini
135 d354c7ec Paolo Bonzini
    /* Runs with lock taken.  */
136 b811203c Stefan Hajnoczi
    if (!pool->new_threads) {
137 d354c7ec Paolo Bonzini
        return;
138 d354c7ec Paolo Bonzini
    }
139 d354c7ec Paolo Bonzini
140 b811203c Stefan Hajnoczi
    pool->new_threads--;
141 b811203c Stefan Hajnoczi
    pool->pending_threads++;
142 d354c7ec Paolo Bonzini
143 b811203c Stefan Hajnoczi
    qemu_thread_create(&t, worker_thread, pool, QEMU_THREAD_DETACHED);
144 d354c7ec Paolo Bonzini
}
145 d354c7ec Paolo Bonzini
146 d354c7ec Paolo Bonzini
static void spawn_thread_bh_fn(void *opaque)
147 d354c7ec Paolo Bonzini
{
148 b811203c Stefan Hajnoczi
    ThreadPool *pool = opaque;
149 b811203c Stefan Hajnoczi
150 b811203c Stefan Hajnoczi
    qemu_mutex_lock(&pool->lock);
151 b811203c Stefan Hajnoczi
    do_spawn_thread(pool);
152 b811203c Stefan Hajnoczi
    qemu_mutex_unlock(&pool->lock);
153 d354c7ec Paolo Bonzini
}
154 d354c7ec Paolo Bonzini
155 b811203c Stefan Hajnoczi
static void spawn_thread(ThreadPool *pool)
156 d354c7ec Paolo Bonzini
{
157 b811203c Stefan Hajnoczi
    pool->cur_threads++;
158 b811203c Stefan Hajnoczi
    pool->new_threads++;
159 d354c7ec Paolo Bonzini
    /* If there are threads being created, they will spawn new workers, so
160 d354c7ec Paolo Bonzini
     * we don't spend time creating many threads in a loop holding a mutex or
161 d354c7ec Paolo Bonzini
     * starving the current vcpu.
162 d354c7ec Paolo Bonzini
     *
163 d354c7ec Paolo Bonzini
     * If there are no idle threads, ask the main thread to create one, so we
164 d354c7ec Paolo Bonzini
     * inherit the correct affinity instead of the vcpu affinity.
165 d354c7ec Paolo Bonzini
     */
166 b811203c Stefan Hajnoczi
    if (!pool->pending_threads) {
167 b811203c Stefan Hajnoczi
        qemu_bh_schedule(pool->new_thread_bh);
168 d354c7ec Paolo Bonzini
    }
169 d354c7ec Paolo Bonzini
}
170 d354c7ec Paolo Bonzini
171 d354c7ec Paolo Bonzini
static void event_notifier_ready(EventNotifier *notifier)
172 d354c7ec Paolo Bonzini
{
173 b811203c Stefan Hajnoczi
    ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
174 d354c7ec Paolo Bonzini
    ThreadPoolElement *elem, *next;
175 d354c7ec Paolo Bonzini
176 d354c7ec Paolo Bonzini
    event_notifier_test_and_clear(notifier);
177 d354c7ec Paolo Bonzini
restart:
178 b811203c Stefan Hajnoczi
    QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
179 d354c7ec Paolo Bonzini
        if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
180 d354c7ec Paolo Bonzini
            continue;
181 d354c7ec Paolo Bonzini
        }
182 d354c7ec Paolo Bonzini
        if (elem->state == THREAD_DONE) {
183 b811203c Stefan Hajnoczi
            trace_thread_pool_complete(pool, elem, elem->common.opaque,
184 b811203c Stefan Hajnoczi
                                       elem->ret);
185 d354c7ec Paolo Bonzini
        }
186 d354c7ec Paolo Bonzini
        if (elem->state == THREAD_DONE && elem->common.cb) {
187 d354c7ec Paolo Bonzini
            QLIST_REMOVE(elem, all);
188 19d092cf Paolo Bonzini
            /* Read state before ret.  */
189 19d092cf Paolo Bonzini
            smp_rmb();
190 19d092cf Paolo Bonzini
            elem->common.cb(elem->common.opaque, elem->ret);
191 d354c7ec Paolo Bonzini
            qemu_aio_release(elem);
192 d354c7ec Paolo Bonzini
            goto restart;
193 d354c7ec Paolo Bonzini
        } else {
194 d354c7ec Paolo Bonzini
            /* remove the request */
195 d354c7ec Paolo Bonzini
            QLIST_REMOVE(elem, all);
196 d354c7ec Paolo Bonzini
            qemu_aio_release(elem);
197 d354c7ec Paolo Bonzini
        }
198 d354c7ec Paolo Bonzini
    }
199 d354c7ec Paolo Bonzini
}
200 d354c7ec Paolo Bonzini
201 d354c7ec Paolo Bonzini
static void thread_pool_cancel(BlockDriverAIOCB *acb)
202 d354c7ec Paolo Bonzini
{
203 d354c7ec Paolo Bonzini
    ThreadPoolElement *elem = (ThreadPoolElement *)acb;
204 b811203c Stefan Hajnoczi
    ThreadPool *pool = elem->pool;
205 d354c7ec Paolo Bonzini
206 d354c7ec Paolo Bonzini
    trace_thread_pool_cancel(elem, elem->common.opaque);
207 d354c7ec Paolo Bonzini
208 b811203c Stefan Hajnoczi
    qemu_mutex_lock(&pool->lock);
209 d354c7ec Paolo Bonzini
    if (elem->state == THREAD_QUEUED &&
210 d354c7ec Paolo Bonzini
        /* No thread has yet started working on elem. we can try to "steal"
211 d354c7ec Paolo Bonzini
         * the item from the worker if we can get a signal from the
212 d354c7ec Paolo Bonzini
         * semaphore.  Because this is non-blocking, we can do it with
213 d354c7ec Paolo Bonzini
         * the lock taken and ensure that elem will remain THREAD_QUEUED.
214 d354c7ec Paolo Bonzini
         */
215 b811203c Stefan Hajnoczi
        qemu_sem_timedwait(&pool->sem, 0) == 0) {
216 b811203c Stefan Hajnoczi
        QTAILQ_REMOVE(&pool->request_list, elem, reqs);
217 d354c7ec Paolo Bonzini
        elem->state = THREAD_CANCELED;
218 b811203c Stefan Hajnoczi
        event_notifier_set(&pool->notifier);
219 d354c7ec Paolo Bonzini
    } else {
220 b811203c Stefan Hajnoczi
        pool->pending_cancellations++;
221 d354c7ec Paolo Bonzini
        while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
222 b811203c Stefan Hajnoczi
            qemu_cond_wait(&pool->check_cancel, &pool->lock);
223 d354c7ec Paolo Bonzini
        }
224 b811203c Stefan Hajnoczi
        pool->pending_cancellations--;
225 d354c7ec Paolo Bonzini
    }
226 b811203c Stefan Hajnoczi
    qemu_mutex_unlock(&pool->lock);
227 d354c7ec Paolo Bonzini
}
228 d354c7ec Paolo Bonzini
229 d7331bed Stefan Hajnoczi
static const AIOCBInfo thread_pool_aiocb_info = {
230 d354c7ec Paolo Bonzini
    .aiocb_size         = sizeof(ThreadPoolElement),
231 d354c7ec Paolo Bonzini
    .cancel             = thread_pool_cancel,
232 d354c7ec Paolo Bonzini
};
233 d354c7ec Paolo Bonzini
234 c4d9d196 Stefan Hajnoczi
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
235 c4d9d196 Stefan Hajnoczi
        ThreadPoolFunc *func, void *arg,
236 d354c7ec Paolo Bonzini
        BlockDriverCompletionFunc *cb, void *opaque)
237 d354c7ec Paolo Bonzini
{
238 d354c7ec Paolo Bonzini
    ThreadPoolElement *req;
239 d354c7ec Paolo Bonzini
240 d7331bed Stefan Hajnoczi
    req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
241 d354c7ec Paolo Bonzini
    req->func = func;
242 d354c7ec Paolo Bonzini
    req->arg = arg;
243 d354c7ec Paolo Bonzini
    req->state = THREAD_QUEUED;
244 b811203c Stefan Hajnoczi
    req->pool = pool;
245 d354c7ec Paolo Bonzini
246 b811203c Stefan Hajnoczi
    QLIST_INSERT_HEAD(&pool->head, req, all);
247 d354c7ec Paolo Bonzini
248 b811203c Stefan Hajnoczi
    trace_thread_pool_submit(pool, req, arg);
249 d354c7ec Paolo Bonzini
250 b811203c Stefan Hajnoczi
    qemu_mutex_lock(&pool->lock);
251 b811203c Stefan Hajnoczi
    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
252 b811203c Stefan Hajnoczi
        spawn_thread(pool);
253 d354c7ec Paolo Bonzini
    }
254 b811203c Stefan Hajnoczi
    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
255 b811203c Stefan Hajnoczi
    qemu_mutex_unlock(&pool->lock);
256 b811203c Stefan Hajnoczi
    qemu_sem_post(&pool->sem);
257 d354c7ec Paolo Bonzini
    return &req->common;
258 d354c7ec Paolo Bonzini
}
259 d354c7ec Paolo Bonzini
260 d354c7ec Paolo Bonzini
typedef struct ThreadPoolCo {
261 d354c7ec Paolo Bonzini
    Coroutine *co;
262 d354c7ec Paolo Bonzini
    int ret;
263 d354c7ec Paolo Bonzini
} ThreadPoolCo;
264 d354c7ec Paolo Bonzini
265 d354c7ec Paolo Bonzini
static void thread_pool_co_cb(void *opaque, int ret)
266 d354c7ec Paolo Bonzini
{
267 d354c7ec Paolo Bonzini
    ThreadPoolCo *co = opaque;
268 d354c7ec Paolo Bonzini
269 d354c7ec Paolo Bonzini
    co->ret = ret;
270 d354c7ec Paolo Bonzini
    qemu_coroutine_enter(co->co, NULL);
271 d354c7ec Paolo Bonzini
}
272 d354c7ec Paolo Bonzini
273 c4d9d196 Stefan Hajnoczi
int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
274 c4d9d196 Stefan Hajnoczi
                                       void *arg)
275 d354c7ec Paolo Bonzini
{
276 d354c7ec Paolo Bonzini
    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
277 d354c7ec Paolo Bonzini
    assert(qemu_in_coroutine());
278 c4d9d196 Stefan Hajnoczi
    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
279 d354c7ec Paolo Bonzini
    qemu_coroutine_yield();
280 d354c7ec Paolo Bonzini
    return tpc.ret;
281 d354c7ec Paolo Bonzini
}
282 d354c7ec Paolo Bonzini
283 c4d9d196 Stefan Hajnoczi
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
284 d354c7ec Paolo Bonzini
{
285 c4d9d196 Stefan Hajnoczi
    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
286 d354c7ec Paolo Bonzini
}
287 d354c7ec Paolo Bonzini
288 b811203c Stefan Hajnoczi
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
289 b811203c Stefan Hajnoczi
{
290 b811203c Stefan Hajnoczi
    if (!ctx) {
291 b811203c Stefan Hajnoczi
        ctx = qemu_get_aio_context();
292 b811203c Stefan Hajnoczi
    }
293 b811203c Stefan Hajnoczi
294 b811203c Stefan Hajnoczi
    memset(pool, 0, sizeof(*pool));
295 b811203c Stefan Hajnoczi
    event_notifier_init(&pool->notifier, false);
296 f7311ccc Stefan Hajnoczi
    pool->ctx = ctx;
297 b811203c Stefan Hajnoczi
    qemu_mutex_init(&pool->lock);
298 b811203c Stefan Hajnoczi
    qemu_cond_init(&pool->check_cancel);
299 f7311ccc Stefan Hajnoczi
    qemu_cond_init(&pool->worker_stopped);
300 b811203c Stefan Hajnoczi
    qemu_sem_init(&pool->sem, 0);
301 b811203c Stefan Hajnoczi
    pool->max_threads = 64;
302 b811203c Stefan Hajnoczi
    pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
303 b811203c Stefan Hajnoczi
304 b811203c Stefan Hajnoczi
    QLIST_INIT(&pool->head);
305 b811203c Stefan Hajnoczi
    QTAILQ_INIT(&pool->request_list);
306 b811203c Stefan Hajnoczi
307 f2e5dca4 Stefan Hajnoczi
    aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready);
308 b811203c Stefan Hajnoczi
}
309 b811203c Stefan Hajnoczi
310 f7311ccc Stefan Hajnoczi
ThreadPool *thread_pool_new(AioContext *ctx)
311 f7311ccc Stefan Hajnoczi
{
312 f7311ccc Stefan Hajnoczi
    ThreadPool *pool = g_new(ThreadPool, 1);
313 f7311ccc Stefan Hajnoczi
    thread_pool_init_one(pool, ctx);
314 f7311ccc Stefan Hajnoczi
    return pool;
315 f7311ccc Stefan Hajnoczi
}
316 f7311ccc Stefan Hajnoczi
317 f7311ccc Stefan Hajnoczi
void thread_pool_free(ThreadPool *pool)
318 f7311ccc Stefan Hajnoczi
{
319 f7311ccc Stefan Hajnoczi
    if (!pool) {
320 f7311ccc Stefan Hajnoczi
        return;
321 f7311ccc Stefan Hajnoczi
    }
322 f7311ccc Stefan Hajnoczi
323 f7311ccc Stefan Hajnoczi
    assert(QLIST_EMPTY(&pool->head));
324 f7311ccc Stefan Hajnoczi
325 f7311ccc Stefan Hajnoczi
    qemu_mutex_lock(&pool->lock);
326 f7311ccc Stefan Hajnoczi
327 f7311ccc Stefan Hajnoczi
    /* Stop new threads from spawning */
328 f7311ccc Stefan Hajnoczi
    qemu_bh_delete(pool->new_thread_bh);
329 f7311ccc Stefan Hajnoczi
    pool->cur_threads -= pool->new_threads;
330 f7311ccc Stefan Hajnoczi
    pool->new_threads = 0;
331 f7311ccc Stefan Hajnoczi
332 f7311ccc Stefan Hajnoczi
    /* Wait for worker threads to terminate */
333 f7311ccc Stefan Hajnoczi
    pool->stopping = true;
334 f7311ccc Stefan Hajnoczi
    while (pool->cur_threads > 0) {
335 f7311ccc Stefan Hajnoczi
        qemu_sem_post(&pool->sem);
336 f7311ccc Stefan Hajnoczi
        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
337 f7311ccc Stefan Hajnoczi
    }
338 f7311ccc Stefan Hajnoczi
339 f7311ccc Stefan Hajnoczi
    qemu_mutex_unlock(&pool->lock);
340 f7311ccc Stefan Hajnoczi
341 f2e5dca4 Stefan Hajnoczi
    aio_set_event_notifier(pool->ctx, &pool->notifier, NULL);
342 f7311ccc Stefan Hajnoczi
    qemu_sem_destroy(&pool->sem);
343 f7311ccc Stefan Hajnoczi
    qemu_cond_destroy(&pool->check_cancel);
344 f7311ccc Stefan Hajnoczi
    qemu_cond_destroy(&pool->worker_stopped);
345 f7311ccc Stefan Hajnoczi
    qemu_mutex_destroy(&pool->lock);
346 f7311ccc Stefan Hajnoczi
    event_notifier_cleanup(&pool->notifier);
347 f7311ccc Stefan Hajnoczi
    g_free(pool);
348 f7311ccc Stefan Hajnoczi
}