Revision f7311ccc thread-pool.c
b/thread-pool.c | ||
---|---|---|
24 | 24 |
#include "qemu/event_notifier.h" |
25 | 25 |
#include "block/thread-pool.h" |
26 | 26 |
|
27 |
typedef struct ThreadPool ThreadPool; |
|
28 |
|
|
29 | 27 |
static void do_spawn_thread(ThreadPool *pool); |
30 | 28 |
|
31 | 29 |
typedef struct ThreadPoolElement ThreadPoolElement; |
... | ... | |
59 | 57 |
|
60 | 58 |
struct ThreadPool { |
61 | 59 |
EventNotifier notifier; |
60 |
AioContext *ctx; |
|
62 | 61 |
QemuMutex lock; |
63 | 62 |
QemuCond check_cancel; |
63 |
QemuCond worker_stopped; |
|
64 | 64 |
QemuSemaphore sem; |
65 | 65 |
int max_threads; |
66 | 66 |
QEMUBH *new_thread_bh; |
... | ... | |
75 | 75 |
int new_threads; /* backlog of threads we need to create */ |
76 | 76 |
int pending_threads; /* threads created but not running yet */ |
77 | 77 |
int pending_cancellations; /* whether we need a cond_broadcast */ |
78 |
bool stopping; |
|
78 | 79 |
}; |
79 | 80 |
|
80 | 81 |
/* Currently there is only one thread pool instance. */ |
... | ... | |
88 | 89 |
pool->pending_threads--; |
89 | 90 |
do_spawn_thread(pool); |
90 | 91 |
|
91 |
while (1) {
|
|
92 |
while (!pool->stopping) {
|
|
92 | 93 |
ThreadPoolElement *req; |
93 | 94 |
int ret; |
94 | 95 |
|
... | ... | |
99 | 100 |
qemu_mutex_lock(&pool->lock); |
100 | 101 |
pool->idle_threads--; |
101 | 102 |
} while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list)); |
102 |
if (ret == -1) { |
|
103 |
if (ret == -1 || pool->stopping) {
|
|
103 | 104 |
break; |
104 | 105 |
} |
105 | 106 |
|
... | ... | |
124 | 125 |
} |
125 | 126 |
|
126 | 127 |
pool->cur_threads--; |
128 |
qemu_cond_signal(&pool->worker_stopped); |
|
127 | 129 |
qemu_mutex_unlock(&pool->lock); |
128 | 130 |
return NULL; |
129 | 131 |
} |
... | ... | |
298 | 300 |
|
299 | 301 |
memset(pool, 0, sizeof(*pool)); |
300 | 302 |
event_notifier_init(&pool->notifier, false); |
303 |
pool->ctx = ctx; |
|
301 | 304 |
qemu_mutex_init(&pool->lock); |
302 | 305 |
qemu_cond_init(&pool->check_cancel); |
306 |
qemu_cond_init(&pool->worker_stopped); |
|
303 | 307 |
qemu_sem_init(&pool->sem, 0); |
304 | 308 |
pool->max_threads = 64; |
305 | 309 |
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool); |
... | ... | |
311 | 315 |
thread_pool_active); |
312 | 316 |
} |
313 | 317 |
|
318 |
ThreadPool *thread_pool_new(AioContext *ctx) |
|
319 |
{ |
|
320 |
ThreadPool *pool = g_new(ThreadPool, 1); |
|
321 |
thread_pool_init_one(pool, ctx); |
|
322 |
return pool; |
|
323 |
} |
|
324 |
|
|
325 |
void thread_pool_free(ThreadPool *pool) |
|
326 |
{ |
|
327 |
if (!pool) { |
|
328 |
return; |
|
329 |
} |
|
330 |
|
|
331 |
assert(QLIST_EMPTY(&pool->head)); |
|
332 |
|
|
333 |
qemu_mutex_lock(&pool->lock); |
|
334 |
|
|
335 |
/* Stop new threads from spawning */ |
|
336 |
qemu_bh_delete(pool->new_thread_bh); |
|
337 |
pool->cur_threads -= pool->new_threads; |
|
338 |
pool->new_threads = 0; |
|
339 |
|
|
340 |
/* Wait for worker threads to terminate */ |
|
341 |
pool->stopping = true; |
|
342 |
while (pool->cur_threads > 0) { |
|
343 |
qemu_sem_post(&pool->sem); |
|
344 |
qemu_cond_wait(&pool->worker_stopped, &pool->lock); |
|
345 |
} |
|
346 |
|
|
347 |
qemu_mutex_unlock(&pool->lock); |
|
348 |
|
|
349 |
aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL); |
|
350 |
qemu_sem_destroy(&pool->sem); |
|
351 |
qemu_cond_destroy(&pool->check_cancel); |
|
352 |
qemu_cond_destroy(&pool->worker_stopped); |
|
353 |
qemu_mutex_destroy(&pool->lock); |
|
354 |
event_notifier_cleanup(&pool->notifier); |
|
355 |
g_free(pool); |
|
356 |
} |
|
357 |
|
|
314 | 358 |
static void thread_pool_init(void) |
315 | 359 |
{ |
316 | 360 |
thread_pool_init_one(&global_pool, NULL); |
Also available in: Unified diff