Revision 9ef91a67

b/block/raw-posix-aio.h
1
/*
2
 * QEMU Posix block I/O backend AIO support
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13
#ifndef QEMU_RAW_POSIX_AIO_H
14
#define QEMU_RAW_POSIX_AIO_H
15

  
16
/* AIO request types */
17
#define QEMU_AIO_READ         0x0001
18
#define QEMU_AIO_WRITE        0x0002
19
#define QEMU_AIO_IOCTL        0x0004
20
#define QEMU_AIO_TYPE_MASK \
21
	(QEMU_AIO_READ|QEMU_AIO_WRITE|QEMU_AIO_IOCTL)
22

  
23
/* AIO flags */
24
#define QEMU_AIO_MISALIGNED   0x1000
25

  
26

  
27
/* posix-aio-compat.c - thread pool based implementation */
28
void *paio_init(void);
29
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
30
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
31
        BlockDriverCompletionFunc *cb, void *opaque, int type);
32
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
33
        unsigned long int req, void *buf,
34
        BlockDriverCompletionFunc *cb, void *opaque);
35

  
36
#endif /* QEMU_RAW_POSIX_AIO_H */
b/block/raw-posix.c
27 27
#include "qemu-log.h"
28 28
#include "block_int.h"
29 29
#include "module.h"
30
#include "posix-aio-compat.h"
30
#include "block/raw-posix-aio.h"
31 31

  
32 32
#ifdef CONFIG_COCOA
33 33
#include <paths.h>
......
107 107
    int type;
108 108
    unsigned int lseek_err_cnt;
109 109
    int open_flags;
110
    void *aio_ctx;
110 111
#if defined(__linux__)
111 112
    /* linux floppy specific */
112 113
    int64_t fd_open_time;
......
117 118
    uint8_t* aligned_buf;
118 119
} BDRVRawState;
119 120

  
120
static int posix_aio_init(void);
121

  
122 121
static int fd_open(BlockDriverState *bs);
123 122
static int64_t raw_getlength(BlockDriverState *bs);
124 123

  
......
132 131
    BDRVRawState *s = bs->opaque;
133 132
    int fd, ret;
134 133

  
135
    posix_aio_init();
136

  
137 134
    s->lseek_err_cnt = 0;
138 135

  
139 136
    s->open_flags = open_flags | O_BINARY;
......
165 162
    if ((bdrv_flags & BDRV_O_NOCACHE)) {
166 163
        s->aligned_buf = qemu_blockalign(bs, ALIGNED_BUFFER_SIZE);
167 164
        if (s->aligned_buf == NULL) {
168
            ret = -errno;
169
            close(fd);
170
            return ret;
165
            goto out_close;
171 166
        }
172 167
    }
168

  
169
    s->aio_ctx = paio_init();
170
    if (!s->aio_ctx) {
171
        goto out_free_buf;
172
    }
173

  
173 174
    return 0;
175

  
176
out_free_buf:
177
    qemu_vfree(s->aligned_buf);
178
out_close:
179
    close(fd);
180
    return -errno;
174 181
}
175 182

  
176 183
static int raw_open(BlockDriverState *bs, const char *filename, int flags)
......
487 494
    return ret;
488 495
}
489 496

  
490
/***********************************************************/
491
/* Unix AIO using POSIX AIO */
492

  
493
typedef struct RawAIOCB {
494
    BlockDriverAIOCB common;
495
    struct qemu_paiocb aiocb;
496
    struct RawAIOCB *next;
497
    int ret;
498
} RawAIOCB;
499

  
500
typedef struct PosixAioState
501
{
502
    int rfd, wfd;
503
    RawAIOCB *first_aio;
504
} PosixAioState;
505

  
506
static void posix_aio_read(void *opaque)
507
{
508
    PosixAioState *s = opaque;
509
    RawAIOCB *acb, **pacb;
510
    int ret;
511
    ssize_t len;
512

  
513
    /* read all bytes from signal pipe */
514
    for (;;) {
515
        char bytes[16];
516

  
517
        len = read(s->rfd, bytes, sizeof(bytes));
518
        if (len == -1 && errno == EINTR)
519
            continue; /* try again */
520
        if (len == sizeof(bytes))
521
            continue; /* more to read */
522
        break;
523
    }
524

  
525
    for(;;) {
526
        pacb = &s->first_aio;
527
        for(;;) {
528
            acb = *pacb;
529
            if (!acb)
530
                goto the_end;
531
            ret = qemu_paio_error(&acb->aiocb);
532
            if (ret == ECANCELED) {
533
                /* remove the request */
534
                *pacb = acb->next;
535
                qemu_aio_release(acb);
536
            } else if (ret != EINPROGRESS) {
537
                /* end of aio */
538
                if (ret == 0) {
539
                    ret = qemu_paio_return(&acb->aiocb);
540
                    if (ret == acb->aiocb.aio_nbytes)
541
                        ret = 0;
542
                    else
543
                        ret = -EINVAL;
544
                } else {
545
                    ret = -ret;
546
                }
547
                /* remove the request */
548
                *pacb = acb->next;
549
                /* call the callback */
550
                acb->common.cb(acb->common.opaque, ret);
551
                qemu_aio_release(acb);
552
                break;
553
            } else {
554
                pacb = &acb->next;
555
            }
556
        }
557
    }
558
 the_end: ;
559
}
560

  
561
static int posix_aio_flush(void *opaque)
562
{
563
    PosixAioState *s = opaque;
564
    return !!s->first_aio;
565
}
566

  
567
static PosixAioState *posix_aio_state;
568

  
569
static void aio_signal_handler(int signum)
570
{
571
    if (posix_aio_state) {
572
        char byte = 0;
573

  
574
        write(posix_aio_state->wfd, &byte, sizeof(byte));
575
    }
576

  
577
    qemu_service_io();
578
}
579

  
580
static int posix_aio_init(void)
497
/*
498
 * Check if all memory in this vector is sector aligned.
499
 */
500
static int qiov_is_aligned(QEMUIOVector *qiov)
581 501
{
582
    struct sigaction act;
583
    PosixAioState *s;
584
    int fds[2];
585
    struct qemu_paioinit ai;
586
  
587
    if (posix_aio_state)
588
        return 0;
589

  
590
    s = qemu_malloc(sizeof(PosixAioState));
591

  
592
    sigfillset(&act.sa_mask);
593
    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
594
    act.sa_handler = aio_signal_handler;
595
    sigaction(SIGUSR2, &act, NULL);
596

  
597
    s->first_aio = NULL;
598
    if (pipe(fds) == -1) {
599
        fprintf(stderr, "failed to create pipe\n");
600
        return -errno;
601
    }
602

  
603
    s->rfd = fds[0];
604
    s->wfd = fds[1];
605

  
606
    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
607
    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
608

  
609
    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
610

  
611
    memset(&ai, 0, sizeof(ai));
612
    ai.aio_threads = 64;
613
    ai.aio_num = 64;
614
    qemu_paio_init(&ai);
615

  
616
    posix_aio_state = s;
617

  
618
    return 0;
619
}
502
    int i;
620 503

  
621
static void raw_aio_remove(RawAIOCB *acb)
622
{
623
    RawAIOCB **pacb;
624

  
625
    /* remove the callback from the queue */
626
    pacb = &posix_aio_state->first_aio;
627
    for(;;) {
628
        if (*pacb == NULL) {
629
            fprintf(stderr, "raw_aio_remove: aio request not found!\n");
630
            break;
631
        } else if (*pacb == acb) {
632
            *pacb = acb->next;
633
            qemu_aio_release(acb);
634
            break;
504
    for (i = 0; i < qiov->niov; i++) {
505
        if ((uintptr_t) qiov->iov[i].iov_base % 512) {
506
            return 0;
635 507
        }
636
        pacb = &(*pacb)->next;
637 508
    }
638
}
639

  
640
static void raw_aio_cancel(BlockDriverAIOCB *blockacb)
641
{
642
    int ret;
643
    RawAIOCB *acb = (RawAIOCB *)blockacb;
644 509

  
645
    ret = qemu_paio_cancel(acb->aiocb.aio_fildes, &acb->aiocb);
646
    if (ret == QEMU_PAIO_NOTCANCELED) {
647
        /* fail safe: if the aio could not be canceled, we wait for
648
           it */
649
        while (qemu_paio_error(&acb->aiocb) == EINPROGRESS);
650
    }
651

  
652
    raw_aio_remove(acb);
510
    return 1;
653 511
}
654 512

  
655
static AIOPool raw_aio_pool = {
656
    .aiocb_size         = sizeof(RawAIOCB),
657
    .cancel             = raw_aio_cancel,
658
};
659

  
660
static RawAIOCB *raw_aio_setup(BlockDriverState *bs, int64_t sector_num,
661
        QEMUIOVector *qiov, int nb_sectors,
662
        BlockDriverCompletionFunc *cb, void *opaque)
513
static BlockDriverAIOCB *raw_aio_submit(BlockDriverState *bs,
514
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
515
        BlockDriverCompletionFunc *cb, void *opaque, int type)
663 516
{
664 517
    BDRVRawState *s = bs->opaque;
665
    RawAIOCB *acb;
666 518

  
667 519
    if (fd_open(bs) < 0)
668 520
        return NULL;
669 521

  
670
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
671
    if (!acb)
672
        return NULL;
673
    acb->aiocb.aio_fildes = s->fd;
674
    acb->aiocb.ev_signo = SIGUSR2;
675
    acb->aiocb.aio_iov = qiov->iov;
676
    acb->aiocb.aio_niov = qiov->niov;
677
    acb->aiocb.aio_nbytes = nb_sectors * 512;
678
    acb->aiocb.aio_offset = sector_num * 512;
679
    acb->aiocb.aio_flags = 0;
680

  
681 522
    /*
682 523
     * If O_DIRECT is used the buffer needs to be aligned on a sector
683
     * boundary. Tell the low level code to ensure that in case it's
684
     * not done yet.
524
     * boundary.  Check if this is the case or telll the low-level
525
     * driver that it needs to copy the buffer.
685 526
     */
686
    if (s->aligned_buf)
687
        acb->aiocb.aio_flags |= QEMU_AIO_SECTOR_ALIGNED;
527
    if (s->aligned_buf && !qiov_is_aligned(qiov)) {
528
        type |= QEMU_AIO_MISALIGNED;
529
    }
688 530

  
689
    acb->next = posix_aio_state->first_aio;
690
    posix_aio_state->first_aio = acb;
691
    return acb;
531
    return paio_submit(bs, s->aio_ctx, s->fd, sector_num, qiov, nb_sectors,
532
                       cb, opaque, type);
692 533
}
693 534

  
694 535
static BlockDriverAIOCB *raw_aio_readv(BlockDriverState *bs,
695 536
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
696 537
        BlockDriverCompletionFunc *cb, void *opaque)
697 538
{
698
    RawAIOCB *acb;
699

  
700
    acb = raw_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque);
701
    if (!acb)
702
        return NULL;
703
    if (qemu_paio_read(&acb->aiocb) < 0) {
704
        raw_aio_remove(acb);
705
        return NULL;
706
    }
707
    return &acb->common;
539
    return raw_aio_submit(bs, sector_num, qiov, nb_sectors,
540
                          cb, opaque, QEMU_AIO_READ);
708 541
}
709 542

  
710 543
static BlockDriverAIOCB *raw_aio_writev(BlockDriverState *bs,
711 544
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
712 545
        BlockDriverCompletionFunc *cb, void *opaque)
713 546
{
714
    RawAIOCB *acb;
715

  
716
    acb = raw_aio_setup(bs, sector_num, qiov, nb_sectors, cb, opaque);
717
    if (!acb)
718
        return NULL;
719
    if (qemu_paio_write(&acb->aiocb) < 0) {
720
        raw_aio_remove(acb);
721
        return NULL;
722
    }
723
    return &acb->common;
547
    return raw_aio_submit(bs, sector_num, qiov, nb_sectors,
548
                          cb, opaque, QEMU_AIO_WRITE);
724 549
}
725 550

  
726 551
static void raw_close(BlockDriverState *bs)
......
1085 910
        BlockDriverCompletionFunc *cb, void *opaque)
1086 911
{
1087 912
    BDRVRawState *s = bs->opaque;
1088
    RawAIOCB *acb;
1089 913

  
1090 914
    if (fd_open(bs) < 0)
1091 915
        return NULL;
1092

  
1093
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
1094
    if (!acb)
1095
        return NULL;
1096
    acb->aiocb.aio_fildes = s->fd;
1097
    acb->aiocb.ev_signo = SIGUSR2;
1098
    acb->aiocb.aio_offset = 0;
1099
    acb->aiocb.aio_flags = 0;
1100

  
1101
    acb->next = posix_aio_state->first_aio;
1102
    posix_aio_state->first_aio = acb;
1103

  
1104
    acb->aiocb.aio_ioctl_buf = buf;
1105
    acb->aiocb.aio_ioctl_cmd = req;
1106
    if (qemu_paio_ioctl(&acb->aiocb) < 0) {
1107
        raw_aio_remove(acb);
1108
        return NULL;
1109
    }
1110

  
1111
    return &acb->common;
916
    return paio_ioctl(bs, s->fd, req, buf, cb, opaque);
1112 917
}
1113 918

  
1114 919
#elif defined(__FreeBSD__)
......
1189 994
    BDRVRawState *s = bs->opaque;
1190 995
    int ret;
1191 996

  
1192
    posix_aio_init();
1193

  
1194 997
    s->type = FTYPE_FD;
1195 998

  
1196 999
    /* open will not fail even if no floppy is inserted, so add O_NONBLOCK */
b/posix-aio-compat.c
12 12
 */
13 13

  
14 14
#include <sys/ioctl.h>
15
#include <sys/types.h>
15 16
#include <pthread.h>
16 17
#include <unistd.h>
17 18
#include <errno.h>
18 19
#include <time.h>
20
#include <signal.h>
19 21
#include <string.h>
20 22
#include <stdlib.h>
21 23
#include <stdio.h>
24

  
25
#include "sys-queue.h"
22 26
#include "osdep.h"
23 27
#include "qemu-common.h"
28
#include "block_int.h"
29

  
30
#include "block/raw-posix-aio.h"
31

  
32

  
33
struct qemu_paiocb {
34
    BlockDriverAIOCB common;
35
    int aio_fildes;
36
    union {
37
        struct iovec *aio_iov;
38
	void *aio_ioctl_buf;
39
    };
40
    int aio_niov;
41
    size_t aio_nbytes;
42
#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
43
    int ev_signo;
44
    off_t aio_offset;
45

  
46
    TAILQ_ENTRY(qemu_paiocb) node;
47
    int aio_type;
48
    ssize_t ret;
49
    int active;
50
    struct qemu_paiocb *next;
51
};
52

  
53
typedef struct PosixAioState {
54
    int rfd, wfd;
55
    struct qemu_paiocb *first_aio;
56
} PosixAioState;
24 57

  
25
#include "posix-aio-compat.h"
26 58

  
27 59
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
28 60
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
......
132 164

  
133 165
#endif
134 166

  
135
/*
136
 * Check if we need to copy the data in the aiocb into a new
137
 * properly aligned buffer.
138
 */
139
static int aiocb_needs_copy(struct qemu_paiocb *aiocb)
140
{
141
    if (aiocb->aio_flags & QEMU_AIO_SECTOR_ALIGNED) {
142
        int i;
143

  
144
        for (i = 0; i < aiocb->aio_niov; i++)
145
            if ((uintptr_t) aiocb->aio_iov[i].iov_base % 512)
146
                return 1;
147
    }
148

  
149
    return 0;
150
}
151

  
152 167
static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
153 168
{
154 169
    size_t offset = 0;
155 170
    ssize_t len;
156 171

  
157 172
    do {
158
        if (aiocb->aio_type == QEMU_PAIO_WRITE)
173
        if (aiocb->aio_type & QEMU_AIO_WRITE)
159 174
            len = qemu_pwritev(aiocb->aio_fildes,
160 175
                               aiocb->aio_iov,
161 176
                               aiocb->aio_niov,
......
178 193
    size_t len;
179 194

  
180 195
    while (offset < aiocb->aio_nbytes) {
181
         if (aiocb->aio_type == QEMU_PAIO_WRITE)
196
         if (aiocb->aio_type & QEMU_AIO_WRITE)
182 197
             len = pwrite(aiocb->aio_fildes,
183 198
                          (const char *)buf + offset,
184 199
                          aiocb->aio_nbytes - offset,
......
208 223
    size_t nbytes;
209 224
    char *buf;
210 225

  
211
    if (!aiocb_needs_copy(aiocb)) {
226
    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
212 227
        /*
213 228
         * If there is just a single buffer, and it is properly aligned
214 229
         * we can just use plain pread/pwrite without any problems.
......
243 258
     * a single aligned buffer.
244 259
     */
245 260
    buf = qemu_memalign(512, aiocb->aio_nbytes);
246
    if (aiocb->aio_type == QEMU_PAIO_WRITE) {
261
    if (aiocb->aio_type & QEMU_AIO_WRITE) {
247 262
        char *p = buf;
248 263
        int i;
249 264

  
......
254 269
    }
255 270

  
256 271
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
257
    if (aiocb->aio_type != QEMU_PAIO_WRITE) {
272
    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
258 273
        char *p = buf;
259 274
        size_t count = aiocb->aio_nbytes, copy;
260 275
        int i;
......
310 325
        idle_threads--;
311 326
        mutex_unlock(&lock);
312 327

  
313
        switch (aiocb->aio_type) {
314
        case QEMU_PAIO_READ:
315
        case QEMU_PAIO_WRITE:
328
        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
329
        case QEMU_AIO_READ:
330
        case QEMU_AIO_WRITE:
316 331
		ret = handle_aiocb_rw(aiocb);
317 332
		break;
318
        case QEMU_PAIO_IOCTL:
333
        case QEMU_AIO_IOCTL:
319 334
		ret = handle_aiocb_ioctl(aiocb);
320 335
		break;
321 336
	default:
......
346 361
    thread_create(&thread_id, &attr, aio_thread, NULL);
347 362
}
348 363

  
349
int qemu_paio_init(struct qemu_paioinit *aioinit)
364
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
350 365
{
351
    int ret;
352

  
353
    ret = pthread_attr_init(&attr);
354
    if (ret) die2(ret, "pthread_attr_init");
355

  
356
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
357
    if (ret) die2(ret, "pthread_attr_setdetachstate");
358

  
359
    TAILQ_INIT(&request_list);
360

  
361
    return 0;
362
}
363

  
364
static int qemu_paio_submit(struct qemu_paiocb *aiocb, int type)
365
{
366
    aiocb->aio_type = type;
367 366
    aiocb->ret = -EINPROGRESS;
368 367
    aiocb->active = 0;
369 368
    mutex_lock(&lock);
......
372 371
    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
373 372
    mutex_unlock(&lock);
374 373
    cond_signal(&cond);
375

  
376
    return 0;
377 374
}
378 375

  
379
int qemu_paio_read(struct qemu_paiocb *aiocb)
380
{
381
    return qemu_paio_submit(aiocb, QEMU_PAIO_READ);
382
}
383

  
384
int qemu_paio_write(struct qemu_paiocb *aiocb)
385
{
386
    return qemu_paio_submit(aiocb, QEMU_PAIO_WRITE);
387
}
388

  
389
int qemu_paio_ioctl(struct qemu_paiocb *aiocb)
390
{
391
    return qemu_paio_submit(aiocb, QEMU_PAIO_IOCTL);
392
}
393

  
394
ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
376
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
395 377
{
396 378
    ssize_t ret;
397 379

  
......
402 384
    return ret;
403 385
}
404 386

  
405
int qemu_paio_error(struct qemu_paiocb *aiocb)
387
static int qemu_paio_error(struct qemu_paiocb *aiocb)
406 388
{
407 389
    ssize_t ret = qemu_paio_return(aiocb);
408 390

  
......
414 396
    return ret;
415 397
}
416 398

  
417
int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
399
static void posix_aio_read(void *opaque)
418 400
{
401
    PosixAioState *s = opaque;
402
    struct qemu_paiocb *acb, **pacb;
419 403
    int ret;
404
    ssize_t len;
405

  
406
    /* read all bytes from signal pipe */
407
    for (;;) {
408
        char bytes[16];
409

  
410
        len = read(s->rfd, bytes, sizeof(bytes));
411
        if (len == -1 && errno == EINTR)
412
            continue; /* try again */
413
        if (len == sizeof(bytes))
414
            continue; /* more to read */
415
        break;
416
    }
417

  
418
    for(;;) {
419
        pacb = &s->first_aio;
420
        for(;;) {
421
            acb = *pacb;
422
            if (!acb)
423
                goto the_end;
424
            ret = qemu_paio_error(acb);
425
            if (ret == ECANCELED) {
426
                /* remove the request */
427
                *pacb = acb->next;
428
                qemu_aio_release(acb);
429
            } else if (ret != EINPROGRESS) {
430
                /* end of aio */
431
                if (ret == 0) {
432
                    ret = qemu_paio_return(acb);
433
                    if (ret == acb->aio_nbytes)
434
                        ret = 0;
435
                    else
436
                        ret = -EINVAL;
437
                } else {
438
                    ret = -ret;
439
                }
440
                /* remove the request */
441
                *pacb = acb->next;
442
                /* call the callback */
443
                acb->common.cb(acb->common.opaque, ret);
444
                qemu_aio_release(acb);
445
                break;
446
            } else {
447
                pacb = &acb->next;
448
            }
449
        }
450
    }
451
 the_end: ;
452
}
453

  
454
static int posix_aio_flush(void *opaque)
455
{
456
    PosixAioState *s = opaque;
457
    return !!s->first_aio;
458
}
459

  
460
static PosixAioState *posix_aio_state;
461

  
462
static void aio_signal_handler(int signum)
463
{
464
    if (posix_aio_state) {
465
        char byte = 0;
466

  
467
        write(posix_aio_state->wfd, &byte, sizeof(byte));
468
    }
469

  
470
    qemu_service_io();
471
}
472

  
473
static void paio_remove(struct qemu_paiocb *acb)
474
{
475
    struct qemu_paiocb **pacb;
476

  
477
    /* remove the callback from the queue */
478
    pacb = &posix_aio_state->first_aio;
479
    for(;;) {
480
        if (*pacb == NULL) {
481
            fprintf(stderr, "paio_remove: aio request not found!\n");
482
            break;
483
        } else if (*pacb == acb) {
484
            *pacb = acb->next;
485
            qemu_aio_release(acb);
486
            break;
487
        }
488
        pacb = &(*pacb)->next;
489
    }
490
}
491

  
492
static void paio_cancel(BlockDriverAIOCB *blockacb)
493
{
494
    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
495
    int active = 0;
420 496

  
421 497
    mutex_lock(&lock);
422
    if (!aiocb->active) {
423
        TAILQ_REMOVE(&request_list, aiocb, node);
424
        aiocb->ret = -ECANCELED;
425
        ret = QEMU_PAIO_CANCELED;
426
    } else if (aiocb->ret == -EINPROGRESS)
427
        ret = QEMU_PAIO_NOTCANCELED;
428
    else
429
        ret = QEMU_PAIO_ALLDONE;
498
    if (!acb->active) {
499
        TAILQ_REMOVE(&request_list, acb, node);
500
        acb->ret = -ECANCELED;
501
    } else if (acb->ret == -EINPROGRESS) {
502
        active = 1;
503
    }
430 504
    mutex_unlock(&lock);
431 505

  
432
    return ret;
506
    if (active) {
507
        /* fail safe: if the aio could not be canceled, we wait for
508
           it */
509
        while (qemu_paio_error(acb) == EINPROGRESS)
510
            ;
511
    }
512

  
513
    paio_remove(acb);
514
}
515

  
516
static AIOPool raw_aio_pool = {
517
    .aiocb_size         = sizeof(struct qemu_paiocb),
518
    .cancel             = paio_cancel,
519
};
520

  
521
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
522
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
523
        BlockDriverCompletionFunc *cb, void *opaque, int type)
524
{
525
    struct qemu_paiocb *acb;
526

  
527
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
528
    if (!acb)
529
        return NULL;
530
    acb->aio_type = type;
531
    acb->aio_fildes = fd;
532
    acb->ev_signo = SIGUSR2;
533
    acb->aio_iov = qiov->iov;
534
    acb->aio_niov = qiov->niov;
535
    acb->aio_nbytes = nb_sectors * 512;
536
    acb->aio_offset = sector_num * 512;
537

  
538
    acb->next = posix_aio_state->first_aio;
539
    posix_aio_state->first_aio = acb;
540

  
541
    qemu_paio_submit(acb);
542
    return &acb->common;
543
}
544

  
545
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
546
        unsigned long int req, void *buf,
547
        BlockDriverCompletionFunc *cb, void *opaque)
548
{
549
    struct qemu_paiocb *acb;
550

  
551
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
552
    if (!acb)
553
        return NULL;
554
    acb->aio_type = QEMU_AIO_IOCTL;
555
    acb->aio_fildes = fd;
556
    acb->ev_signo = SIGUSR2;
557
    acb->aio_offset = 0;
558
    acb->aio_ioctl_buf = buf;
559
    acb->aio_ioctl_cmd = req;
560

  
561
    acb->next = posix_aio_state->first_aio;
562
    posix_aio_state->first_aio = acb;
563

  
564
    qemu_paio_submit(acb);
565
    return &acb->common;
566
}
567

  
568
void *paio_init(void)
569
{
570
    struct sigaction act;
571
    PosixAioState *s;
572
    int fds[2];
573
    int ret;
574

  
575
    if (posix_aio_state)
576
        return posix_aio_state;
577

  
578
    s = qemu_malloc(sizeof(PosixAioState));
579

  
580
    sigfillset(&act.sa_mask);
581
    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
582
    act.sa_handler = aio_signal_handler;
583
    sigaction(SIGUSR2, &act, NULL);
584

  
585
    s->first_aio = NULL;
586
    if (pipe(fds) == -1) {
587
        fprintf(stderr, "failed to create pipe\n");
588
        return NULL;
589
    }
590

  
591
    s->rfd = fds[0];
592
    s->wfd = fds[1];
593

  
594
    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
595
    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
596

  
597
    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
598

  
599
    ret = pthread_attr_init(&attr);
600
    if (ret)
601
        die2(ret, "pthread_attr_init");
602

  
603
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
604
    if (ret)
605
        die2(ret, "pthread_attr_setdetachstate");
606

  
607
    TAILQ_INIT(&request_list);
608

  
609
    posix_aio_state = s;
610

  
611
    return posix_aio_state;
433 612
}
/dev/null
1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

  
14
#ifndef QEMU_POSIX_AIO_COMPAT_H
15
#define QEMU_POSIX_AIO_COMPAT_H
16

  
17
#include <sys/types.h>
18
#include <unistd.h>
19
#include <signal.h>
20

  
21
#include "sys-queue.h"
22

  
23
#define QEMU_PAIO_CANCELED     0x01
24
#define QEMU_PAIO_NOTCANCELED  0x02
25
#define QEMU_PAIO_ALLDONE      0x03
26

  
27
struct qemu_paiocb
28
{
29
    int aio_fildes;
30
    union {
31
        struct iovec *aio_iov;
32
	void *aio_ioctl_buf;
33
    };
34
    int aio_niov;
35
    size_t aio_nbytes;
36
#define aio_ioctl_cmd   aio_nbytes /* for QEMU_PAIO_IOCTL */
37
    int ev_signo;
38
    off_t aio_offset;
39
    unsigned aio_flags;
40
/* 512 byte alignment required for buffer, offset and length */
41
#define QEMU_AIO_SECTOR_ALIGNED	0x01
42

  
43
    /* private */
44
    TAILQ_ENTRY(qemu_paiocb) node;
45
    int aio_type;
46
#define QEMU_PAIO_READ         0x01
47
#define QEMU_PAIO_WRITE        0x02
48
#define QEMU_PAIO_IOCTL        0x03
49
    ssize_t ret;
50
    int active;
51
};
52

  
53
struct qemu_paioinit
54
{
55
    unsigned int aio_threads;
56
    unsigned int aio_num;
57
    unsigned int aio_idle_time;
58
};
59

  
60
int qemu_paio_init(struct qemu_paioinit *aioinit);
61
int qemu_paio_read(struct qemu_paiocb *aiocb);
62
int qemu_paio_write(struct qemu_paiocb *aiocb);
63
int qemu_paio_ioctl(struct qemu_paiocb *aiocb);
64
int qemu_paio_error(struct qemu_paiocb *aiocb);
65
ssize_t qemu_paio_return(struct qemu_paiocb *aiocb);
66
int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb);
67

  
68
#endif

Also available in: Unified diff