24 |
24 |
#include "qemu/event_notifier.h"
|
25 |
25 |
#include "block/thread-pool.h"
|
26 |
26 |
|
27 |
|
typedef struct ThreadPool ThreadPool;
|
28 |
|
|
29 |
27 |
static void do_spawn_thread(ThreadPool *pool);
|
30 |
28 |
|
31 |
29 |
typedef struct ThreadPoolElement ThreadPoolElement;
|
... | ... | |
59 |
57 |
|
60 |
58 |
struct ThreadPool {
|
61 |
59 |
EventNotifier notifier;
|
|
60 |
AioContext *ctx;
|
62 |
61 |
QemuMutex lock;
|
63 |
62 |
QemuCond check_cancel;
|
|
63 |
QemuCond worker_stopped;
|
64 |
64 |
QemuSemaphore sem;
|
65 |
65 |
int max_threads;
|
66 |
66 |
QEMUBH *new_thread_bh;
|
... | ... | |
75 |
75 |
int new_threads; /* backlog of threads we need to create */
|
76 |
76 |
int pending_threads; /* threads created but not running yet */
|
77 |
77 |
int pending_cancellations; /* whether we need a cond_broadcast */
|
|
78 |
bool stopping;
|
78 |
79 |
};
|
79 |
80 |
|
80 |
81 |
/* Currently there is only one thread pool instance. */
|
... | ... | |
88 |
89 |
pool->pending_threads--;
|
89 |
90 |
do_spawn_thread(pool);
|
90 |
91 |
|
91 |
|
while (1) {
|
|
92 |
while (!pool->stopping) {
|
92 |
93 |
ThreadPoolElement *req;
|
93 |
94 |
int ret;
|
94 |
95 |
|
... | ... | |
99 |
100 |
qemu_mutex_lock(&pool->lock);
|
100 |
101 |
pool->idle_threads--;
|
101 |
102 |
} while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
|
102 |
|
if (ret == -1) {
|
|
103 |
if (ret == -1 || pool->stopping) {
|
103 |
104 |
break;
|
104 |
105 |
}
|
105 |
106 |
|
... | ... | |
124 |
125 |
}
|
125 |
126 |
|
126 |
127 |
pool->cur_threads--;
|
|
128 |
qemu_cond_signal(&pool->worker_stopped);
|
127 |
129 |
qemu_mutex_unlock(&pool->lock);
|
128 |
130 |
return NULL;
|
129 |
131 |
}
|
... | ... | |
298 |
300 |
|
299 |
301 |
memset(pool, 0, sizeof(*pool));
|
300 |
302 |
event_notifier_init(&pool->notifier, false);
|
|
303 |
pool->ctx = ctx;
|
301 |
304 |
qemu_mutex_init(&pool->lock);
|
302 |
305 |
qemu_cond_init(&pool->check_cancel);
|
|
306 |
qemu_cond_init(&pool->worker_stopped);
|
303 |
307 |
qemu_sem_init(&pool->sem, 0);
|
304 |
308 |
pool->max_threads = 64;
|
305 |
309 |
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
|
... | ... | |
311 |
315 |
thread_pool_active);
|
312 |
316 |
}
|
313 |
317 |
|
|
318 |
ThreadPool *thread_pool_new(AioContext *ctx)
|
|
319 |
{
|
|
320 |
ThreadPool *pool = g_new(ThreadPool, 1);
|
|
321 |
thread_pool_init_one(pool, ctx);
|
|
322 |
return pool;
|
|
323 |
}
|
|
324 |
|
|
325 |
void thread_pool_free(ThreadPool *pool)
|
|
326 |
{
|
|
327 |
if (!pool) {
|
|
328 |
return;
|
|
329 |
}
|
|
330 |
|
|
331 |
assert(QLIST_EMPTY(&pool->head));
|
|
332 |
|
|
333 |
qemu_mutex_lock(&pool->lock);
|
|
334 |
|
|
335 |
/* Stop new threads from spawning */
|
|
336 |
qemu_bh_delete(pool->new_thread_bh);
|
|
337 |
pool->cur_threads -= pool->new_threads;
|
|
338 |
pool->new_threads = 0;
|
|
339 |
|
|
340 |
/* Wait for worker threads to terminate */
|
|
341 |
pool->stopping = true;
|
|
342 |
while (pool->cur_threads > 0) {
|
|
343 |
qemu_sem_post(&pool->sem);
|
|
344 |
qemu_cond_wait(&pool->worker_stopped, &pool->lock);
|
|
345 |
}
|
|
346 |
|
|
347 |
qemu_mutex_unlock(&pool->lock);
|
|
348 |
|
|
349 |
aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL);
|
|
350 |
qemu_sem_destroy(&pool->sem);
|
|
351 |
qemu_cond_destroy(&pool->check_cancel);
|
|
352 |
qemu_cond_destroy(&pool->worker_stopped);
|
|
353 |
qemu_mutex_destroy(&pool->lock);
|
|
354 |
event_notifier_cleanup(&pool->notifier);
|
|
355 |
g_free(pool);
|
|
356 |
}
|
|
357 |
|
314 |
358 |
static void thread_pool_init(void)
|
315 |
359 |
{
|
316 |
360 |
thread_pool_init_one(&global_pool, NULL);
|