Revision f7311ccc thread-pool.c

b/thread-pool.c
24 24
#include "qemu/event_notifier.h"
25 25
#include "block/thread-pool.h"
26 26

  
27
typedef struct ThreadPool ThreadPool;
28

  
29 27
static void do_spawn_thread(ThreadPool *pool);
30 28

  
31 29
typedef struct ThreadPoolElement ThreadPoolElement;
......
59 57

  
60 58
struct ThreadPool {
61 59
    EventNotifier notifier;
60
    AioContext *ctx;
62 61
    QemuMutex lock;
63 62
    QemuCond check_cancel;
63
    QemuCond worker_stopped;
64 64
    QemuSemaphore sem;
65 65
    int max_threads;
66 66
    QEMUBH *new_thread_bh;
......
75 75
    int new_threads;     /* backlog of threads we need to create */
76 76
    int pending_threads; /* threads created but not running yet */
77 77
    int pending_cancellations; /* whether we need a cond_broadcast */
78
    bool stopping;
78 79
};
79 80

  
80 81
/* Currently there is only one thread pool instance. */
......
88 89
    pool->pending_threads--;
89 90
    do_spawn_thread(pool);
90 91

  
91
    while (1) {
92
    while (!pool->stopping) {
92 93
        ThreadPoolElement *req;
93 94
        int ret;
94 95

  
......
99 100
            qemu_mutex_lock(&pool->lock);
100 101
            pool->idle_threads--;
101 102
        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
102
        if (ret == -1) {
103
        if (ret == -1 || pool->stopping) {
103 104
            break;
104 105
        }
105 106

  
......
124 125
    }
125 126

  
126 127
    pool->cur_threads--;
128
    qemu_cond_signal(&pool->worker_stopped);
127 129
    qemu_mutex_unlock(&pool->lock);
128 130
    return NULL;
129 131
}
......
298 300

  
299 301
    memset(pool, 0, sizeof(*pool));
300 302
    event_notifier_init(&pool->notifier, false);
303
    pool->ctx = ctx;
301 304
    qemu_mutex_init(&pool->lock);
302 305
    qemu_cond_init(&pool->check_cancel);
306
    qemu_cond_init(&pool->worker_stopped);
303 307
    qemu_sem_init(&pool->sem, 0);
304 308
    pool->max_threads = 64;
305 309
    pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
......
311 315
                           thread_pool_active);
312 316
}
313 317

  
318
ThreadPool *thread_pool_new(AioContext *ctx)
319
{
320
    ThreadPool *pool = g_new(ThreadPool, 1);
321
    thread_pool_init_one(pool, ctx);
322
    return pool;
323
}
324

  
325
void thread_pool_free(ThreadPool *pool)
326
{
327
    if (!pool) {
328
        return;
329
    }
330

  
331
    assert(QLIST_EMPTY(&pool->head));
332

  
333
    qemu_mutex_lock(&pool->lock);
334

  
335
    /* Stop new threads from spawning */
336
    qemu_bh_delete(pool->new_thread_bh);
337
    pool->cur_threads -= pool->new_threads;
338
    pool->new_threads = 0;
339

  
340
    /* Wait for worker threads to terminate */
341
    pool->stopping = true;
342
    while (pool->cur_threads > 0) {
343
        qemu_sem_post(&pool->sem);
344
        qemu_cond_wait(&pool->worker_stopped, &pool->lock);
345
    }
346

  
347
    qemu_mutex_unlock(&pool->lock);
348

  
349
    aio_set_event_notifier(pool->ctx, &pool->notifier, NULL, NULL);
350
    qemu_sem_destroy(&pool->sem);
351
    qemu_cond_destroy(&pool->check_cancel);
352
    qemu_cond_destroy(&pool->worker_stopped);
353
    qemu_mutex_destroy(&pool->lock);
354
    event_notifier_cleanup(&pool->notifier);
355
    g_free(pool);
356
}
357

  
314 358
static void thread_pool_init(void)
315 359
{
316 360
    thread_pool_init_one(&global_pool, NULL);

Also available in: Unified diff