Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ f8d3d128

History | View | Annotate | Download (17 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 * Contributions after 2012-01-13 are licensed under the terms of the
13
 * GNU GPL, version 2 or (at your option) any later version.
14
 */
15

    
16
#include <sys/ioctl.h>
17
#include <sys/types.h>
18
#include <pthread.h>
19
#include <unistd.h>
20
#include <errno.h>
21
#include <time.h>
22
#include <string.h>
23
#include <stdlib.h>
24
#include <stdio.h>
25

    
26
#include "qemu-queue.h"
27
#include "osdep.h"
28
#include "sysemu.h"
29
#include "qemu-common.h"
30
#include "trace.h"
31
#include "block_int.h"
32

    
33
#include "block/raw-posix-aio.h"
34

    
35
static void do_spawn_thread(void);
36

    
37
struct qemu_paiocb {
38
    BlockDriverAIOCB common;
39
    int aio_fildes;
40
    union {
41
        struct iovec *aio_iov;
42
        void *aio_ioctl_buf;
43
    };
44
    int aio_niov;
45
    size_t aio_nbytes;
46
#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
47
    off_t aio_offset;
48

    
49
    QTAILQ_ENTRY(qemu_paiocb) node;
50
    int aio_type;
51
    ssize_t ret;
52
    int active;
53
    struct qemu_paiocb *next;
54
};
55

    
56
typedef struct PosixAioState {
57
    int rfd, wfd;
58
    struct qemu_paiocb *first_aio;
59
} PosixAioState;
60

    
61

    
62
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
63
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
64
static pthread_t thread_id;
65
static pthread_attr_t attr;
66
static int max_threads = 64;
67
static int cur_threads = 0;
68
static int idle_threads = 0;
69
static int new_threads = 0;     /* backlog of threads we need to create */
70
static int pending_threads = 0; /* threads created but not running yet */
71
static QEMUBH *new_thread_bh;
72
static QTAILQ_HEAD(, qemu_paiocb) request_list;
73

    
74
#ifdef CONFIG_PREADV
75
static int preadv_present = 1;
76
#else
77
static int preadv_present = 0;
78
#endif
79

    
80
static void die2(int err, const char *what)
81
{
82
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
83
    abort();
84
}
85

    
86
static void die(const char *what)
87
{
88
    die2(errno, what);
89
}
90

    
91
static void mutex_lock(pthread_mutex_t *mutex)
92
{
93
    int ret = pthread_mutex_lock(mutex);
94
    if (ret) die2(ret, "pthread_mutex_lock");
95
}
96

    
97
static void mutex_unlock(pthread_mutex_t *mutex)
98
{
99
    int ret = pthread_mutex_unlock(mutex);
100
    if (ret) die2(ret, "pthread_mutex_unlock");
101
}
102

    
103
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
104
                           struct timespec *ts)
105
{
106
    int ret = pthread_cond_timedwait(cond, mutex, ts);
107
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
108
    return ret;
109
}
110

    
111
static void cond_signal(pthread_cond_t *cond)
112
{
113
    int ret = pthread_cond_signal(cond);
114
    if (ret) die2(ret, "pthread_cond_signal");
115
}
116

    
117
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
118
                          void *(*start_routine)(void*), void *arg)
119
{
120
    int ret = pthread_create(thread, attr, start_routine, arg);
121
    if (ret) die2(ret, "pthread_create");
122
}
123

    
124
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
125
{
126
    int ret;
127

    
128
    ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
129
    if (ret == -1)
130
        return -errno;
131

    
132
    /*
133
     * This looks weird, but the aio code only considers a request
134
     * successful if it has written the full number of bytes.
135
     *
136
     * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
137
     * so in fact we return the ioctl command here to make posix_aio_read()
138
     * happy..
139
     */
140
    return aiocb->aio_nbytes;
141
}
142

    
143
static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
144
{
145
    int ret;
146

    
147
    ret = qemu_fdatasync(aiocb->aio_fildes);
148
    if (ret == -1)
149
        return -errno;
150
    return 0;
151
}
152

    
153
#ifdef CONFIG_PREADV
154

    
155
static ssize_t
156
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
157
{
158
    return preadv(fd, iov, nr_iov, offset);
159
}
160

    
161
static ssize_t
162
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
163
{
164
    return pwritev(fd, iov, nr_iov, offset);
165
}
166

    
167
#else
168

    
169
static ssize_t
170
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
171
{
172
    return -ENOSYS;
173
}
174

    
175
static ssize_t
176
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
177
{
178
    return -ENOSYS;
179
}
180

    
181
#endif
182

    
183
static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
184
{
185
    ssize_t len;
186

    
187
    do {
188
        if (aiocb->aio_type & QEMU_AIO_WRITE)
189
            len = qemu_pwritev(aiocb->aio_fildes,
190
                               aiocb->aio_iov,
191
                               aiocb->aio_niov,
192
                               aiocb->aio_offset);
193
         else
194
            len = qemu_preadv(aiocb->aio_fildes,
195
                              aiocb->aio_iov,
196
                              aiocb->aio_niov,
197
                              aiocb->aio_offset);
198
    } while (len == -1 && errno == EINTR);
199

    
200
    if (len == -1)
201
        return -errno;
202
    return len;
203
}
204

    
205
/*
206
 * Read/writes the data to/from a given linear buffer.
207
 *
208
 * Returns the number of bytes handles or -errno in case of an error. Short
209
 * reads are only returned if the end of the file is reached.
210
 */
211
static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
212
{
213
    ssize_t offset = 0;
214
    ssize_t len;
215

    
216
    while (offset < aiocb->aio_nbytes) {
217
         if (aiocb->aio_type & QEMU_AIO_WRITE)
218
             len = pwrite(aiocb->aio_fildes,
219
                          (const char *)buf + offset,
220
                          aiocb->aio_nbytes - offset,
221
                          aiocb->aio_offset + offset);
222
         else
223
             len = pread(aiocb->aio_fildes,
224
                         buf + offset,
225
                         aiocb->aio_nbytes - offset,
226
                         aiocb->aio_offset + offset);
227

    
228
         if (len == -1 && errno == EINTR)
229
             continue;
230
         else if (len == -1) {
231
             offset = -errno;
232
             break;
233
         } else if (len == 0)
234
             break;
235

    
236
         offset += len;
237
    }
238

    
239
    return offset;
240
}
241

    
242
static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
243
{
244
    ssize_t nbytes;
245
    char *buf;
246

    
247
    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
248
        /*
249
         * If there is just a single buffer, and it is properly aligned
250
         * we can just use plain pread/pwrite without any problems.
251
         */
252
        if (aiocb->aio_niov == 1)
253
             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
254

    
255
        /*
256
         * We have more than one iovec, and all are properly aligned.
257
         *
258
         * Try preadv/pwritev first and fall back to linearizing the
259
         * buffer if it's not supported.
260
         */
261
        if (preadv_present) {
262
            nbytes = handle_aiocb_rw_vector(aiocb);
263
            if (nbytes == aiocb->aio_nbytes)
264
                return nbytes;
265
            if (nbytes < 0 && nbytes != -ENOSYS)
266
                return nbytes;
267
            preadv_present = 0;
268
        }
269

    
270
        /*
271
         * XXX(hch): short read/write.  no easy way to handle the reminder
272
         * using these interfaces.  For now retry using plain
273
         * pread/pwrite?
274
         */
275
    }
276

    
277
    /*
278
     * Ok, we have to do it the hard way, copy all segments into
279
     * a single aligned buffer.
280
     */
281
    buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
282
    if (aiocb->aio_type & QEMU_AIO_WRITE) {
283
        char *p = buf;
284
        int i;
285

    
286
        for (i = 0; i < aiocb->aio_niov; ++i) {
287
            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
288
            p += aiocb->aio_iov[i].iov_len;
289
        }
290
    }
291

    
292
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
293
    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
294
        char *p = buf;
295
        size_t count = aiocb->aio_nbytes, copy;
296
        int i;
297

    
298
        for (i = 0; i < aiocb->aio_niov && count; ++i) {
299
            copy = count;
300
            if (copy > aiocb->aio_iov[i].iov_len)
301
                copy = aiocb->aio_iov[i].iov_len;
302
            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
303
            p     += copy;
304
            count -= copy;
305
        }
306
    }
307
    qemu_vfree(buf);
308

    
309
    return nbytes;
310
}
311

    
312
static void posix_aio_notify_event(void);
313

    
314
static void *aio_thread(void *unused)
315
{
316
    mutex_lock(&lock);
317
    pending_threads--;
318
    mutex_unlock(&lock);
319
    do_spawn_thread();
320

    
321
    while (1) {
322
        struct qemu_paiocb *aiocb;
323
        ssize_t ret = 0;
324
        qemu_timeval tv;
325
        struct timespec ts;
326

    
327
        qemu_gettimeofday(&tv);
328
        ts.tv_sec = tv.tv_sec + 10;
329
        ts.tv_nsec = 0;
330

    
331
        mutex_lock(&lock);
332

    
333
        while (QTAILQ_EMPTY(&request_list) &&
334
               !(ret == ETIMEDOUT)) {
335
            idle_threads++;
336
            ret = cond_timedwait(&cond, &lock, &ts);
337
            idle_threads--;
338
        }
339

    
340
        if (QTAILQ_EMPTY(&request_list))
341
            break;
342

    
343
        aiocb = QTAILQ_FIRST(&request_list);
344
        QTAILQ_REMOVE(&request_list, aiocb, node);
345
        aiocb->active = 1;
346
        mutex_unlock(&lock);
347

    
348
        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
349
        case QEMU_AIO_READ:
350
            ret = handle_aiocb_rw(aiocb);
351
            if (ret >= 0 && ret < aiocb->aio_nbytes && aiocb->common.bs->growable) {
352
                /* A short read means that we have reached EOF. Pad the buffer
353
                 * with zeros for bytes after EOF. */
354
                QEMUIOVector qiov;
355

    
356
                qemu_iovec_init_external(&qiov, aiocb->aio_iov,
357
                                         aiocb->aio_niov);
358
                qemu_iovec_memset_skip(&qiov, 0, aiocb->aio_nbytes - ret, ret);
359

    
360
                ret = aiocb->aio_nbytes;
361
            }
362
            break;
363
        case QEMU_AIO_WRITE:
364
            ret = handle_aiocb_rw(aiocb);
365
            break;
366
        case QEMU_AIO_FLUSH:
367
            ret = handle_aiocb_flush(aiocb);
368
            break;
369
        case QEMU_AIO_IOCTL:
370
            ret = handle_aiocb_ioctl(aiocb);
371
            break;
372
        default:
373
            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
374
            ret = -EINVAL;
375
            break;
376
        }
377

    
378
        mutex_lock(&lock);
379
        aiocb->ret = ret;
380
        mutex_unlock(&lock);
381

    
382
        posix_aio_notify_event();
383
    }
384

    
385
    cur_threads--;
386
    mutex_unlock(&lock);
387

    
388
    return NULL;
389
}
390

    
391
static void do_spawn_thread(void)
392
{
393
    sigset_t set, oldset;
394

    
395
    mutex_lock(&lock);
396
    if (!new_threads) {
397
        mutex_unlock(&lock);
398
        return;
399
    }
400

    
401
    new_threads--;
402
    pending_threads++;
403

    
404
    mutex_unlock(&lock);
405

    
406
    /* block all signals */
407
    if (sigfillset(&set)) die("sigfillset");
408
    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
409

    
410
    thread_create(&thread_id, &attr, aio_thread, NULL);
411

    
412
    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
413
}
414

    
415
static void spawn_thread_bh_fn(void *opaque)
416
{
417
    do_spawn_thread();
418
}
419

    
420
static void spawn_thread(void)
421
{
422
    cur_threads++;
423
    new_threads++;
424
    /* If there are threads being created, they will spawn new workers, so
425
     * we don't spend time creating many threads in a loop holding a mutex or
426
     * starving the current vcpu.
427
     *
428
     * If there are no idle threads, ask the main thread to create one, so we
429
     * inherit the correct affinity instead of the vcpu affinity.
430
     */
431
    if (!pending_threads) {
432
        qemu_bh_schedule(new_thread_bh);
433
    }
434
}
435

    
436
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
437
{
438
    aiocb->ret = -EINPROGRESS;
439
    aiocb->active = 0;
440
    mutex_lock(&lock);
441
    if (idle_threads == 0 && cur_threads < max_threads)
442
        spawn_thread();
443
    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
444
    mutex_unlock(&lock);
445
    cond_signal(&cond);
446
}
447

    
448
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
449
{
450
    ssize_t ret;
451

    
452
    mutex_lock(&lock);
453
    ret = aiocb->ret;
454
    mutex_unlock(&lock);
455

    
456
    return ret;
457
}
458

    
459
static int qemu_paio_error(struct qemu_paiocb *aiocb)
460
{
461
    ssize_t ret = qemu_paio_return(aiocb);
462

    
463
    if (ret < 0)
464
        ret = -ret;
465
    else
466
        ret = 0;
467

    
468
    return ret;
469
}
470

    
471
static int posix_aio_process_queue(void *opaque)
472
{
473
    PosixAioState *s = opaque;
474
    struct qemu_paiocb *acb, **pacb;
475
    int ret;
476
    int result = 0;
477

    
478
    for(;;) {
479
        pacb = &s->first_aio;
480
        for(;;) {
481
            acb = *pacb;
482
            if (!acb)
483
                return result;
484

    
485
            ret = qemu_paio_error(acb);
486
            if (ret == ECANCELED) {
487
                /* remove the request */
488
                *pacb = acb->next;
489
                qemu_aio_release(acb);
490
                result = 1;
491
            } else if (ret != EINPROGRESS) {
492
                /* end of aio */
493
                if (ret == 0) {
494
                    ret = qemu_paio_return(acb);
495
                    if (ret == acb->aio_nbytes)
496
                        ret = 0;
497
                    else
498
                        ret = -EINVAL;
499
                } else {
500
                    ret = -ret;
501
                }
502

    
503
                trace_paio_complete(acb, acb->common.opaque, ret);
504

    
505
                /* remove the request */
506
                *pacb = acb->next;
507
                /* call the callback */
508
                acb->common.cb(acb->common.opaque, ret);
509
                qemu_aio_release(acb);
510
                result = 1;
511
                break;
512
            } else {
513
                pacb = &acb->next;
514
            }
515
        }
516
    }
517

    
518
    return result;
519
}
520

    
521
static void posix_aio_read(void *opaque)
522
{
523
    PosixAioState *s = opaque;
524
    ssize_t len;
525

    
526
    /* read all bytes from signal pipe */
527
    for (;;) {
528
        char bytes[16];
529

    
530
        len = read(s->rfd, bytes, sizeof(bytes));
531
        if (len == -1 && errno == EINTR)
532
            continue; /* try again */
533
        if (len == sizeof(bytes))
534
            continue; /* more to read */
535
        break;
536
    }
537

    
538
    posix_aio_process_queue(s);
539
}
540

    
541
static int posix_aio_flush(void *opaque)
542
{
543
    PosixAioState *s = opaque;
544
    return !!s->first_aio;
545
}
546

    
547
static PosixAioState *posix_aio_state;
548

    
549
static void posix_aio_notify_event(void)
550
{
551
    char byte = 0;
552
    ssize_t ret;
553

    
554
    ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
555
    if (ret < 0 && errno != EAGAIN)
556
        die("write()");
557
}
558

    
559
static void paio_remove(struct qemu_paiocb *acb)
560
{
561
    struct qemu_paiocb **pacb;
562

    
563
    /* remove the callback from the queue */
564
    pacb = &posix_aio_state->first_aio;
565
    for(;;) {
566
        if (*pacb == NULL) {
567
            fprintf(stderr, "paio_remove: aio request not found!\n");
568
            break;
569
        } else if (*pacb == acb) {
570
            *pacb = acb->next;
571
            qemu_aio_release(acb);
572
            break;
573
        }
574
        pacb = &(*pacb)->next;
575
    }
576
}
577

    
578
static void paio_cancel(BlockDriverAIOCB *blockacb)
579
{
580
    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
581
    int active = 0;
582

    
583
    trace_paio_cancel(acb, acb->common.opaque);
584

    
585
    mutex_lock(&lock);
586
    if (!acb->active) {
587
        QTAILQ_REMOVE(&request_list, acb, node);
588
        acb->ret = -ECANCELED;
589
    } else if (acb->ret == -EINPROGRESS) {
590
        active = 1;
591
    }
592
    mutex_unlock(&lock);
593

    
594
    if (active) {
595
        /* fail safe: if the aio could not be canceled, we wait for
596
           it */
597
        while (qemu_paio_error(acb) == EINPROGRESS)
598
            ;
599
    }
600

    
601
    paio_remove(acb);
602
}
603

    
604
static AIOPool raw_aio_pool = {
605
    .aiocb_size         = sizeof(struct qemu_paiocb),
606
    .cancel             = paio_cancel,
607
};
608

    
609
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
610
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
611
        BlockDriverCompletionFunc *cb, void *opaque, int type)
612
{
613
    struct qemu_paiocb *acb;
614

    
615
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
616
    acb->aio_type = type;
617
    acb->aio_fildes = fd;
618

    
619
    if (qiov) {
620
        acb->aio_iov = qiov->iov;
621
        acb->aio_niov = qiov->niov;
622
    }
623
    acb->aio_nbytes = nb_sectors * 512;
624
    acb->aio_offset = sector_num * 512;
625

    
626
    acb->next = posix_aio_state->first_aio;
627
    posix_aio_state->first_aio = acb;
628

    
629
    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
630
    qemu_paio_submit(acb);
631
    return &acb->common;
632
}
633

    
634
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
635
        unsigned long int req, void *buf,
636
        BlockDriverCompletionFunc *cb, void *opaque)
637
{
638
    struct qemu_paiocb *acb;
639

    
640
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
641
    acb->aio_type = QEMU_AIO_IOCTL;
642
    acb->aio_fildes = fd;
643
    acb->aio_offset = 0;
644
    acb->aio_ioctl_buf = buf;
645
    acb->aio_ioctl_cmd = req;
646

    
647
    acb->next = posix_aio_state->first_aio;
648
    posix_aio_state->first_aio = acb;
649

    
650
    qemu_paio_submit(acb);
651
    return &acb->common;
652
}
653

    
654
int paio_init(void)
655
{
656
    PosixAioState *s;
657
    int fds[2];
658
    int ret;
659

    
660
    if (posix_aio_state)
661
        return 0;
662

    
663
    s = g_malloc(sizeof(PosixAioState));
664

    
665
    s->first_aio = NULL;
666
    if (qemu_pipe(fds) == -1) {
667
        fprintf(stderr, "failed to create pipe\n");
668
        g_free(s);
669
        return -1;
670
    }
671

    
672
    s->rfd = fds[0];
673
    s->wfd = fds[1];
674

    
675
    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
676
    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
677

    
678
    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
679
        posix_aio_process_queue, s);
680

    
681
    ret = pthread_attr_init(&attr);
682
    if (ret)
683
        die2(ret, "pthread_attr_init");
684

    
685
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
686
    if (ret)
687
        die2(ret, "pthread_attr_setdetachstate");
688

    
689
    QTAILQ_INIT(&request_list);
690
    new_thread_bh = qemu_bh_new(spawn_thread_bh_fn, NULL);
691

    
692
    posix_aio_state = s;
693
    return 0;
694
}