Statistics
| Branch: | Revision:

root / thread-pool.c @ f6049f44

History | View | Annotate | Download (9.7 kB)

1 d354c7ec Paolo Bonzini
/*
2 d354c7ec Paolo Bonzini
 * QEMU block layer thread pool
3 d354c7ec Paolo Bonzini
 *
4 d354c7ec Paolo Bonzini
 * Copyright IBM, Corp. 2008
5 d354c7ec Paolo Bonzini
 * Copyright Red Hat, Inc. 2012
6 d354c7ec Paolo Bonzini
 *
7 d354c7ec Paolo Bonzini
 * Authors:
8 d354c7ec Paolo Bonzini
 *  Anthony Liguori   <aliguori@us.ibm.com>
9 d354c7ec Paolo Bonzini
 *  Paolo Bonzini     <pbonzini@redhat.com>
10 d354c7ec Paolo Bonzini
 *
11 d354c7ec Paolo Bonzini
 * This work is licensed under the terms of the GNU GPL, version 2.  See
12 d354c7ec Paolo Bonzini
 * the COPYING file in the top-level directory.
13 d354c7ec Paolo Bonzini
 *
14 d354c7ec Paolo Bonzini
 * Contributions after 2012-01-13 are licensed under the terms of the
15 d354c7ec Paolo Bonzini
 * GNU GPL, version 2 or (at your option) any later version.
16 d354c7ec Paolo Bonzini
 */
17 d354c7ec Paolo Bonzini
#include "qemu-common.h"
18 1de7afc9 Paolo Bonzini
#include "qemu/queue.h"
19 1de7afc9 Paolo Bonzini
#include "qemu/thread.h"
20 1de7afc9 Paolo Bonzini
#include "qemu/osdep.h"
21 737e150e Paolo Bonzini
#include "block/coroutine.h"
22 d354c7ec Paolo Bonzini
#include "trace.h"
23 737e150e Paolo Bonzini
#include "block/block_int.h"
24 1de7afc9 Paolo Bonzini
#include "qemu/event_notifier.h"
25 737e150e Paolo Bonzini
#include "block/thread-pool.h"
26 d354c7ec Paolo Bonzini
27 b811203c Stefan Hajnoczi
static void do_spawn_thread(ThreadPool *pool);
28 d354c7ec Paolo Bonzini
29 d354c7ec Paolo Bonzini
typedef struct ThreadPoolElement ThreadPoolElement;
30 d354c7ec Paolo Bonzini
31 d354c7ec Paolo Bonzini
enum ThreadState {
32 d354c7ec Paolo Bonzini
    THREAD_QUEUED,
33 d354c7ec Paolo Bonzini
    THREAD_ACTIVE,
34 d354c7ec Paolo Bonzini
    THREAD_DONE,
35 d354c7ec Paolo Bonzini
    THREAD_CANCELED,
36 d354c7ec Paolo Bonzini
};
37 d354c7ec Paolo Bonzini
38 d354c7ec Paolo Bonzini
struct ThreadPoolElement {
39 d354c7ec Paolo Bonzini
    BlockDriverAIOCB common;
40 b811203c Stefan Hajnoczi
    ThreadPool *pool;
41 d354c7ec Paolo Bonzini
    ThreadPoolFunc *func;
42 d354c7ec Paolo Bonzini
    void *arg;
43 19d092cf Paolo Bonzini
44 19d092cf Paolo Bonzini
    /* Moving state out of THREAD_QUEUED is protected by lock.  After
45 19d092cf Paolo Bonzini
     * that, only the worker thread can write to it.  Reads and writes
46 19d092cf Paolo Bonzini
     * of state and ret are ordered with memory barriers.
47 19d092cf Paolo Bonzini
     */
48 d354c7ec Paolo Bonzini
    enum ThreadState state;
49 d354c7ec Paolo Bonzini
    int ret;
50 d354c7ec Paolo Bonzini
51 d354c7ec Paolo Bonzini
    /* Access to this list is protected by lock.  */
52 d354c7ec Paolo Bonzini
    QTAILQ_ENTRY(ThreadPoolElement) reqs;
53 d354c7ec Paolo Bonzini
54 d354c7ec Paolo Bonzini
    /* Access to this list is protected by the global mutex.  */
55 d354c7ec Paolo Bonzini
    QLIST_ENTRY(ThreadPoolElement) all;
56 d354c7ec Paolo Bonzini
};
57 d354c7ec Paolo Bonzini
58 b811203c Stefan Hajnoczi
struct ThreadPool {
59 b811203c Stefan Hajnoczi
    EventNotifier notifier;
60 f7311ccc Stefan Hajnoczi
    AioContext *ctx;
61 b811203c Stefan Hajnoczi
    QemuMutex lock;
62 b811203c Stefan Hajnoczi
    QemuCond check_cancel;
63 f7311ccc Stefan Hajnoczi
    QemuCond worker_stopped;
64 b811203c Stefan Hajnoczi
    QemuSemaphore sem;
65 b811203c Stefan Hajnoczi
    int max_threads;
66 b811203c Stefan Hajnoczi
    QEMUBH *new_thread_bh;
67 b811203c Stefan Hajnoczi
68 b811203c Stefan Hajnoczi
    /* The following variables are only accessed from one AioContext. */
69 b811203c Stefan Hajnoczi
    QLIST_HEAD(, ThreadPoolElement) head;
70 b811203c Stefan Hajnoczi
71 b811203c Stefan Hajnoczi
    /* The following variables are protected by lock.  */
72 b811203c Stefan Hajnoczi
    QTAILQ_HEAD(, ThreadPoolElement) request_list;
73 b811203c Stefan Hajnoczi
    int cur_threads;
74 b811203c Stefan Hajnoczi
    int idle_threads;
75 b811203c Stefan Hajnoczi
    int new_threads;     /* backlog of threads we need to create */
76 b811203c Stefan Hajnoczi
    int pending_threads; /* threads created but not running yet */
77 b811203c Stefan Hajnoczi
    int pending_cancellations; /* whether we need a cond_broadcast */
78 f7311ccc Stefan Hajnoczi
    bool stopping;
79 b811203c Stefan Hajnoczi
};
80 b811203c Stefan Hajnoczi
81 b811203c Stefan Hajnoczi
static void *worker_thread(void *opaque)
82 d354c7ec Paolo Bonzini
{
83 b811203c Stefan Hajnoczi
    ThreadPool *pool = opaque;
84 b811203c Stefan Hajnoczi
85 b811203c Stefan Hajnoczi
    qemu_mutex_lock(&pool->lock);
86 b811203c Stefan Hajnoczi
    pool->pending_threads--;
87 b811203c Stefan Hajnoczi
    do_spawn_thread(pool);
88 d354c7ec Paolo Bonzini
89 f7311ccc Stefan Hajnoczi
    while (!pool->stopping) {
90 d354c7ec Paolo Bonzini
        ThreadPoolElement *req;
91 d354c7ec Paolo Bonzini
        int ret;
92 d354c7ec Paolo Bonzini
93 d354c7ec Paolo Bonzini
        do {
94 b811203c Stefan Hajnoczi
            pool->idle_threads++;
95 b811203c Stefan Hajnoczi
            qemu_mutex_unlock(&pool->lock);
96 b811203c Stefan Hajnoczi
            ret = qemu_sem_timedwait(&pool->sem, 10000);
97 b811203c Stefan Hajnoczi
            qemu_mutex_lock(&pool->lock);
98 b811203c Stefan Hajnoczi
            pool->idle_threads--;
99 b811203c Stefan Hajnoczi
        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
100 f7311ccc Stefan Hajnoczi
        if (ret == -1 || pool->stopping) {
101 d354c7ec Paolo Bonzini
            break;
102 d354c7ec Paolo Bonzini
        }
103 d354c7ec Paolo Bonzini
104 b811203c Stefan Hajnoczi
        req = QTAILQ_FIRST(&pool->request_list);
105 b811203c Stefan Hajnoczi
        QTAILQ_REMOVE(&pool->request_list, req, reqs);
106 d354c7ec Paolo Bonzini
        req->state = THREAD_ACTIVE;
107 b811203c Stefan Hajnoczi
        qemu_mutex_unlock(&pool->lock);
108 d354c7ec Paolo Bonzini
109 d354c7ec Paolo Bonzini
        ret = req->func(req->arg);
110 d354c7ec Paolo Bonzini
111 d354c7ec Paolo Bonzini
        req->ret = ret;
112 19d092cf Paolo Bonzini
        /* Write ret before state.  */
113 19d092cf Paolo Bonzini
        smp_wmb();
114 19d092cf Paolo Bonzini
        req->state = THREAD_DONE;
115 19d092cf Paolo Bonzini
116 b811203c Stefan Hajnoczi
        qemu_mutex_lock(&pool->lock);
117 b811203c Stefan Hajnoczi
        if (pool->pending_cancellations) {
118 b811203c Stefan Hajnoczi
            qemu_cond_broadcast(&pool->check_cancel);
119 d354c7ec Paolo Bonzini
        }
120 d354c7ec Paolo Bonzini
121 b811203c Stefan Hajnoczi
        event_notifier_set(&pool->notifier);
122 d354c7ec Paolo Bonzini
    }
123 d354c7ec Paolo Bonzini
124 b811203c Stefan Hajnoczi
    pool->cur_threads--;
125 f7311ccc Stefan Hajnoczi
    qemu_cond_signal(&pool->worker_stopped);
126 b811203c Stefan Hajnoczi
    qemu_mutex_unlock(&pool->lock);
127 d354c7ec Paolo Bonzini
    return NULL;
128 d354c7ec Paolo Bonzini
}
129 d354c7ec Paolo Bonzini
130 b811203c Stefan Hajnoczi
static void do_spawn_thread(ThreadPool *pool)
131 d354c7ec Paolo Bonzini
{
132 d354c7ec Paolo Bonzini
    QemuThread t;
133 d354c7ec Paolo Bonzini
134 d354c7ec Paolo Bonzini
    /* Runs with lock taken.  */
135 b811203c Stefan Hajnoczi
    if (!pool->new_threads) {
136 d354c7ec Paolo Bonzini
        return;
137 d354c7ec Paolo Bonzini
    }
138 d354c7ec Paolo Bonzini
139 b811203c Stefan Hajnoczi
    pool->new_threads--;
140 b811203c Stefan Hajnoczi
    pool->pending_threads++;
141 d354c7ec Paolo Bonzini
142 b811203c Stefan Hajnoczi
    qemu_thread_create(&t, worker_thread, pool, QEMU_THREAD_DETACHED);
143 d354c7ec Paolo Bonzini
}
144 d354c7ec Paolo Bonzini
145 d354c7ec Paolo Bonzini
static void spawn_thread_bh_fn(void *opaque)
146 d354c7ec Paolo Bonzini
{
147 b811203c Stefan Hajnoczi
    ThreadPool *pool = opaque;
148 b811203c Stefan Hajnoczi
149 b811203c Stefan Hajnoczi
    qemu_mutex_lock(&pool->lock);
150 b811203c Stefan Hajnoczi
    do_spawn_thread(pool);
151 b811203c Stefan Hajnoczi
    qemu_mutex_unlock(&pool->lock);
152 d354c7ec Paolo Bonzini
}
153 d354c7ec Paolo Bonzini
154 b811203c Stefan Hajnoczi
static void spawn_thread(ThreadPool *pool)
155 d354c7ec Paolo Bonzini
{
156 b811203c Stefan Hajnoczi
    pool->cur_threads++;
157 b811203c Stefan Hajnoczi
    pool->new_threads++;
158 d354c7ec Paolo Bonzini
    /* If there are threads being created, they will spawn new workers, so
159 d354c7ec Paolo Bonzini
     * we don't spend time creating many threads in a loop holding a mutex or
160 d354c7ec Paolo Bonzini
     * starving the current vcpu.
161 d354c7ec Paolo Bonzini
     *
162 d354c7ec Paolo Bonzini
     * If there are no idle threads, ask the main thread to create one, so we
163 d354c7ec Paolo Bonzini
     * inherit the correct affinity instead of the vcpu affinity.
164 d354c7ec Paolo Bonzini
     */
165 b811203c Stefan Hajnoczi
    if (!pool->pending_threads) {
166 b811203c Stefan Hajnoczi
        qemu_bh_schedule(pool->new_thread_bh);
167 d354c7ec Paolo Bonzini
    }
168 d354c7ec Paolo Bonzini
}
169 d354c7ec Paolo Bonzini
170 d354c7ec Paolo Bonzini
static void event_notifier_ready(EventNotifier *notifier)
171 d354c7ec Paolo Bonzini
{
172 b811203c Stefan Hajnoczi
    ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
173 d354c7ec Paolo Bonzini
    ThreadPoolElement *elem, *next;
174 d354c7ec Paolo Bonzini
175 d354c7ec Paolo Bonzini
    event_notifier_test_and_clear(notifier);
176 d354c7ec Paolo Bonzini
restart:
177 b811203c Stefan Hajnoczi
    QLIST_FOREACH_SAFE(elem, &pool->head, all, next) {
178 d354c7ec Paolo Bonzini
        if (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
179 d354c7ec Paolo Bonzini
            continue;
180 d354c7ec Paolo Bonzini
        }
181 d354c7ec Paolo Bonzini
        if (elem->state == THREAD_DONE) {
182 b811203c Stefan Hajnoczi
            trace_thread_pool_complete(pool, elem, elem->common.opaque,
183 b811203c Stefan Hajnoczi
                                       elem->ret);
184 d354c7ec Paolo Bonzini
        }
185 d354c7ec Paolo Bonzini
        if (elem->state == THREAD_DONE && elem->common.cb) {
186 d354c7ec Paolo Bonzini
            QLIST_REMOVE(elem, all);
187 19d092cf Paolo Bonzini
            /* Read state before ret.  */
188 19d092cf Paolo Bonzini
            smp_rmb();
189 19d092cf Paolo Bonzini
            elem->common.cb(elem->common.opaque, elem->ret);
190 d354c7ec Paolo Bonzini
            qemu_aio_release(elem);
191 d354c7ec Paolo Bonzini
            goto restart;
192 d354c7ec Paolo Bonzini
        } else {
193 d354c7ec Paolo Bonzini
            /* remove the request */
194 d354c7ec Paolo Bonzini
            QLIST_REMOVE(elem, all);
195 d354c7ec Paolo Bonzini
            qemu_aio_release(elem);
196 d354c7ec Paolo Bonzini
        }
197 d354c7ec Paolo Bonzini
    }
198 d354c7ec Paolo Bonzini
}
199 d354c7ec Paolo Bonzini
200 d354c7ec Paolo Bonzini
static int thread_pool_active(EventNotifier *notifier)
201 d354c7ec Paolo Bonzini
{
202 b811203c Stefan Hajnoczi
    ThreadPool *pool = container_of(notifier, ThreadPool, notifier);
203 b811203c Stefan Hajnoczi
    return !QLIST_EMPTY(&pool->head);
204 d354c7ec Paolo Bonzini
}
205 d354c7ec Paolo Bonzini
206 d354c7ec Paolo Bonzini
static void thread_pool_cancel(BlockDriverAIOCB *acb)
207 d354c7ec Paolo Bonzini
{
208 d354c7ec Paolo Bonzini
    ThreadPoolElement *elem = (ThreadPoolElement *)acb;
209 b811203c Stefan Hajnoczi
    ThreadPool *pool = elem->pool;
210 d354c7ec Paolo Bonzini
211 d354c7ec Paolo Bonzini
    trace_thread_pool_cancel(elem, elem->common.opaque);
212 d354c7ec Paolo Bonzini
213 b811203c Stefan Hajnoczi
    qemu_mutex_lock(&pool->lock);
214 d354c7ec Paolo Bonzini
    if (elem->state == THREAD_QUEUED &&
215 d354c7ec Paolo Bonzini
        /* No thread has yet started working on elem. we can try to "steal"
216 d354c7ec Paolo Bonzini
         * the item from the worker if we can get a signal from the
217 d354c7ec Paolo Bonzini
         * semaphore.  Because this is non-blocking, we can do it with
218 d354c7ec Paolo Bonzini
         * the lock taken and ensure that elem will remain THREAD_QUEUED.
219 d354c7ec Paolo Bonzini
         */
220 b811203c Stefan Hajnoczi
        qemu_sem_timedwait(&pool->sem, 0) == 0) {
221 b811203c Stefan Hajnoczi
        QTAILQ_REMOVE(&pool->request_list, elem, reqs);
222 d354c7ec Paolo Bonzini
        elem->state = THREAD_CANCELED;
223 b811203c Stefan Hajnoczi
        event_notifier_set(&pool->notifier);
224 d354c7ec Paolo Bonzini
    } else {
225 b811203c Stefan Hajnoczi
        pool->pending_cancellations++;
226 d354c7ec Paolo Bonzini
        while (elem->state != THREAD_CANCELED && elem->state != THREAD_DONE) {
227 b811203c Stefan Hajnoczi
            qemu_cond_wait(&pool->check_cancel, &pool->lock);
228 d354c7ec Paolo Bonzini
        }
229 b811203c Stefan Hajnoczi
        pool->pending_cancellations--;
230 d354c7ec Paolo Bonzini
    }
231 b811203c Stefan Hajnoczi
    qemu_mutex_unlock(&pool->lock);
232 d354c7ec Paolo Bonzini
}
233 d354c7ec Paolo Bonzini
234 d7331bed Stefan Hajnoczi
static const AIOCBInfo thread_pool_aiocb_info = {
235 d354c7ec Paolo Bonzini
    .aiocb_size         = sizeof(ThreadPoolElement),
236 d354c7ec Paolo Bonzini
    .cancel             = thread_pool_cancel,
237 d354c7ec Paolo Bonzini
};
238 d354c7ec Paolo Bonzini
239 c4d9d196 Stefan Hajnoczi
BlockDriverAIOCB *thread_pool_submit_aio(ThreadPool *pool,
240 c4d9d196 Stefan Hajnoczi
        ThreadPoolFunc *func, void *arg,
241 d354c7ec Paolo Bonzini
        BlockDriverCompletionFunc *cb, void *opaque)
242 d354c7ec Paolo Bonzini
{
243 d354c7ec Paolo Bonzini
    ThreadPoolElement *req;
244 d354c7ec Paolo Bonzini
245 d7331bed Stefan Hajnoczi
    req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
246 d354c7ec Paolo Bonzini
    req->func = func;
247 d354c7ec Paolo Bonzini
    req->arg = arg;
248 d354c7ec Paolo Bonzini
    req->state = THREAD_QUEUED;
249 b811203c Stefan Hajnoczi
    req->pool = pool;
250 d354c7ec Paolo Bonzini
251 b811203c Stefan Hajnoczi
    QLIST_INSERT_HEAD(&pool->head, req, all);
252 d354c7ec Paolo Bonzini
253 b811203c Stefan Hajnoczi
    trace_thread_pool_submit(pool, req, arg);
254 d354c7ec Paolo Bonzini
255 b811203c Stefan Hajnoczi
    qemu_mutex_lock(&pool->lock);
256 b811203c Stefan Hajnoczi
    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
257 b811203c Stefan Hajnoczi
        spawn_thread(pool);
258 d354c7ec Paolo Bonzini
    }
259 b811203c Stefan Hajnoczi
    QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
260 b811203c Stefan Hajnoczi
    qemu_mutex_unlock(&pool->lock);
261 b811203c Stefan Hajnoczi
    qemu_sem_post(&pool->sem);
262 d354c7ec Paolo Bonzini
    return &req->common;
263 d354c7ec Paolo Bonzini
}
264 d354c7ec Paolo Bonzini
265 d354c7ec Paolo Bonzini
typedef struct ThreadPoolCo {
266 d354c7ec Paolo Bonzini
    Coroutine *co;
267 d354c7ec Paolo Bonzini
    int ret;
268 d354c7ec Paolo Bonzini
} ThreadPoolCo;
269 d354c7ec Paolo Bonzini
270 d354c7ec Paolo Bonzini
static void thread_pool_co_cb(void *opaque, int ret)
271 d354c7ec Paolo Bonzini
{
272 d354c7ec Paolo Bonzini
    ThreadPoolCo *co = opaque;
273 d354c7ec Paolo Bonzini
274 d354c7ec Paolo Bonzini
    co->ret = ret;
275 d354c7ec Paolo Bonzini
    qemu_coroutine_enter(co->co, NULL);
276 d354c7ec Paolo Bonzini
}
277 d354c7ec Paolo Bonzini
278 c4d9d196 Stefan Hajnoczi
int coroutine_fn thread_pool_submit_co(ThreadPool *pool, ThreadPoolFunc *func,
279 c4d9d196 Stefan Hajnoczi
                                       void *arg)
280 d354c7ec Paolo Bonzini
{
281 d354c7ec Paolo Bonzini
    ThreadPoolCo tpc = { .co = qemu_coroutine_self(), .ret = -EINPROGRESS };
282 d354c7ec Paolo Bonzini
    assert(qemu_in_coroutine());
283 c4d9d196 Stefan Hajnoczi
    thread_pool_submit_aio(pool, func, arg, thread_pool_co_cb, &tpc);
284 d354c7ec Paolo Bonzini
    qemu_coroutine_yield();
285 d354c7ec Paolo Bonzini
    return tpc.ret;
286 d354c7ec Paolo Bonzini
}
287 d354c7ec Paolo Bonzini
288 c4d9d196 Stefan Hajnoczi
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
289 d354c7ec Paolo Bonzini
{
290 c4d9d196 Stefan Hajnoczi
    thread_pool_submit_aio(pool, func, arg, NULL, NULL);
291 d354c7ec Paolo Bonzini
}
292 d354c7ec Paolo Bonzini
293 b811203c Stefan Hajnoczi
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
294 b811203c Stefan Hajnoczi
{
295 b811203c Stefan Hajnoczi
    if (!ctx) {
296 b811203c Stefan Hajnoczi
        ctx = qemu_get_aio_context();
297 b811203c Stefan Hajnoczi
    }
298 b811203c Stefan Hajnoczi
299 b811203c Stefan Hajnoczi
    memset(pool, 0, sizeof(*pool));
300 b811203c Stefan Hajnoczi
    event_notifier_init(&pool->notifier, false);
301 f7311ccc Stefan Hajnoczi
    pool->ctx = ctx;
302 b811203c Stefan Hajnoczi
    qemu_mutex_init(&pool->lock);
303 b811203c Stefan Hajnoczi
    qemu_cond_init(&pool->check_cancel);
304 f7311ccc Stefan Hajnoczi
    qemu_cond_init(&pool->worker_stopped);
305 b811203c Stefan Hajnoczi
    qemu_sem_init(&pool->sem, 0);
306 b811203c Stefan Hajnoczi
    pool->max_threads = 64;
307 b811203c Stefan Hajnoczi
    pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
308 b811203c Stefan Hajnoczi
309 b811203c Stefan Hajnoczi
    QLIST_INIT(&pool->head);
310 b811203c Stefan Hajnoczi
    QTAILQ_INIT(&pool->request_list);
311 b811203c Stefan Hajnoczi
312 b811203c Stefan Hajnoczi
    aio_set_event_notifier(ctx, &pool->notifier, event_notifier_ready,
313 b811203c Stefan Hajnoczi
                           thread_pool_active);
314 b811203c Stefan Hajnoczi
}
315 b811203c Stefan Hajnoczi
316 f7311ccc Stefan Hajnoczi
ThreadPool *thread_pool_new(AioContext *ctx)
317 f7311ccc Stefan Hajnoczi
{
318 f7311ccc Stefan Hajnoczi
    ThreadPool *pool = g_new(ThreadPool, 1);
319 f7311ccc Stefan Hajnoczi
    thread_pool_init_one(pool, ctx);
320 f7311ccc Stefan Hajnoczi
    return pool;
321 f7311ccc Stefan Hajnoczi
}
322 f7311ccc Stefan Hajnoczi
323 f7311ccc Stefan Hajnoczi
void thread_pool_free(ThreadPool *pool)
324 f7311ccc Stefan Hajnoczi
{
325 f7311ccc Stefan Hajnoczi
    if (!pool) {
326 f7311ccc Stefan Hajnoczi
        return;
327 f7311ccc Stefan Hajnoczi
    }
328 f7311ccc Stefan Hajnoczi
329 f7311ccc Stefan Hajnoczi
    assert(QLIST_EMPTY(&pool->head));
330 f7311ccc Stefan Hajnoczi
331 f7311ccc Stefan Hajnoczi
    qemu_mutex_lock(&pool->lock);
332 f7311ccc Stefan Hajnoczi
333 f7311ccc Stefan Hajnoczi
    /* Stop new threads from spawning */
334 f7311ccc Stefan Hajnoczi
    qemu_bh_delete(pool->new_thread_bh);
335 f7311ccc Stefan Hajnoczi
    pool->cur_threads -= pool->new_threads;
336 f7311ccc Stefan Hajnoczi
    pool->new_threads = 0;
337 f7311ccc Stefan Hajnoczi
338 f7311ccc Stefan Hajnoczi
    /* Wait for worker threads to terminate */
339 f7311ccc Stefan Hajnoczi
    pool->stopping = true;
340 f7311ccc Stefan Hajnoczi
    while (pool->cur_threads > 0) {
341 f7311ccc Stefan Hajnoczi
        qemu_sem_post(&pool->sem);
342 f7311ccc Stefan Hajnoczi
        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
343 f7311ccc Stefan Hajnoczi
    }
344 f7311ccc Stefan Hajnoczi
345 f7311ccc Stefan Hajnoczi
    qemu_mutex_unlock(&pool->lock);
346 f7311ccc Stefan Hajnoczi
347 f7311ccc Stefan Hajnoczi
    aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL);
348 f7311ccc Stefan Hajnoczi
    qemu_sem_destroy(&pool->sem);
349 f7311ccc Stefan Hajnoczi
    qemu_cond_destroy(&pool->check_cancel);
350 f7311ccc Stefan Hajnoczi
    qemu_cond_destroy(&pool->worker_stopped);
351 f7311ccc Stefan Hajnoczi
    qemu_mutex_destroy(&pool->lock);
352 f7311ccc Stefan Hajnoczi
    event_notifier_cleanup(&pool->notifier);
353 f7311ccc Stefan Hajnoczi
    g_free(pool);
354 f7311ccc Stefan Hajnoczi
}