Revision 9ef91a67 posix-aio-compat.c

b/posix-aio-compat.c
12 12
 */
13 13

  
14 14
#include <sys/ioctl.h>
15
#include <sys/types.h>
15 16
#include <pthread.h>
16 17
#include <unistd.h>
17 18
#include <errno.h>
18 19
#include <time.h>
20
#include <signal.h>
19 21
#include <string.h>
20 22
#include <stdlib.h>
21 23
#include <stdio.h>
24

  
25
#include "sys-queue.h"
22 26
#include "osdep.h"
23 27
#include "qemu-common.h"
28
#include "block_int.h"
29

  
30
#include "block/raw-posix-aio.h"
31

  
32

  
33
struct qemu_paiocb {
34
    BlockDriverAIOCB common;
35
    int aio_fildes;
36
    union {
37
        struct iovec *aio_iov;
38
	void *aio_ioctl_buf;
39
    };
40
    int aio_niov;
41
    size_t aio_nbytes;
42
#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
43
    int ev_signo;
44
    off_t aio_offset;
45

  
46
    TAILQ_ENTRY(qemu_paiocb) node;
47
    int aio_type;
48
    ssize_t ret;
49
    int active;
50
    struct qemu_paiocb *next;
51
};
52

  
53
typedef struct PosixAioState {
54
    int rfd, wfd;
55
    struct qemu_paiocb *first_aio;
56
} PosixAioState;
24 57

  
25
#include "posix-aio-compat.h"
26 58

  
27 59
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
28 60
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
......
132 164

  
133 165
#endif
134 166

  
135
/*
136
 * Check if we need to copy the data in the aiocb into a new
137
 * properly aligned buffer.
138
 */
139
static int aiocb_needs_copy(struct qemu_paiocb *aiocb)
140
{
141
    if (aiocb->aio_flags & QEMU_AIO_SECTOR_ALIGNED) {
142
        int i;
143

  
144
        for (i = 0; i < aiocb->aio_niov; i++)
145
            if ((uintptr_t) aiocb->aio_iov[i].iov_base % 512)
146
                return 1;
147
    }
148

  
149
    return 0;
150
}
151

  
152 167
static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
153 168
{
154 169
    size_t offset = 0;
155 170
    ssize_t len;
156 171

  
157 172
    do {
158
        if (aiocb->aio_type == QEMU_PAIO_WRITE)
173
        if (aiocb->aio_type & QEMU_AIO_WRITE)
159 174
            len = qemu_pwritev(aiocb->aio_fildes,
160 175
                               aiocb->aio_iov,
161 176
                               aiocb->aio_niov,
......
178 193
    size_t len;
179 194

  
180 195
    while (offset < aiocb->aio_nbytes) {
181
         if (aiocb->aio_type == QEMU_PAIO_WRITE)
196
         if (aiocb->aio_type & QEMU_AIO_WRITE)
182 197
             len = pwrite(aiocb->aio_fildes,
183 198
                          (const char *)buf + offset,
184 199
                          aiocb->aio_nbytes - offset,
......
208 223
    size_t nbytes;
209 224
    char *buf;
210 225

  
211
    if (!aiocb_needs_copy(aiocb)) {
226
    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
212 227
        /*
213 228
         * If there is just a single buffer, and it is properly aligned
214 229
         * we can just use plain pread/pwrite without any problems.
......
243 258
     * a single aligned buffer.
244 259
     */
245 260
    buf = qemu_memalign(512, aiocb->aio_nbytes);
246
    if (aiocb->aio_type == QEMU_PAIO_WRITE) {
261
    if (aiocb->aio_type & QEMU_AIO_WRITE) {
247 262
        char *p = buf;
248 263
        int i;
249 264

  
......
254 269
    }
255 270

  
256 271
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
257
    if (aiocb->aio_type != QEMU_PAIO_WRITE) {
272
    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
258 273
        char *p = buf;
259 274
        size_t count = aiocb->aio_nbytes, copy;
260 275
        int i;
......
310 325
        idle_threads--;
311 326
        mutex_unlock(&lock);
312 327

  
313
        switch (aiocb->aio_type) {
314
        case QEMU_PAIO_READ:
315
        case QEMU_PAIO_WRITE:
328
        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
329
        case QEMU_AIO_READ:
330
        case QEMU_AIO_WRITE:
316 331
		ret = handle_aiocb_rw(aiocb);
317 332
		break;
318
        case QEMU_PAIO_IOCTL:
333
        case QEMU_AIO_IOCTL:
319 334
		ret = handle_aiocb_ioctl(aiocb);
320 335
		break;
321 336
	default:
......
346 361
    thread_create(&thread_id, &attr, aio_thread, NULL);
347 362
}
348 363

  
349
int qemu_paio_init(struct qemu_paioinit *aioinit)
364
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
350 365
{
351
    int ret;
352

  
353
    ret = pthread_attr_init(&attr);
354
    if (ret) die2(ret, "pthread_attr_init");
355

  
356
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
357
    if (ret) die2(ret, "pthread_attr_setdetachstate");
358

  
359
    TAILQ_INIT(&request_list);
360

  
361
    return 0;
362
}
363

  
364
static int qemu_paio_submit(struct qemu_paiocb *aiocb, int type)
365
{
366
    aiocb->aio_type = type;
367 366
    aiocb->ret = -EINPROGRESS;
368 367
    aiocb->active = 0;
369 368
    mutex_lock(&lock);
......
372 371
    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
373 372
    mutex_unlock(&lock);
374 373
    cond_signal(&cond);
375

  
376
    return 0;
377 374
}
378 375

  
379
int qemu_paio_read(struct qemu_paiocb *aiocb)
380
{
381
    return qemu_paio_submit(aiocb, QEMU_PAIO_READ);
382
}
383

  
384
int qemu_paio_write(struct qemu_paiocb *aiocb)
385
{
386
    return qemu_paio_submit(aiocb, QEMU_PAIO_WRITE);
387
}
388

  
389
int qemu_paio_ioctl(struct qemu_paiocb *aiocb)
390
{
391
    return qemu_paio_submit(aiocb, QEMU_PAIO_IOCTL);
392
}
393

  
394
ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
376
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
395 377
{
396 378
    ssize_t ret;
397 379

  
......
402 384
    return ret;
403 385
}
404 386

  
405
int qemu_paio_error(struct qemu_paiocb *aiocb)
387
static int qemu_paio_error(struct qemu_paiocb *aiocb)
406 388
{
407 389
    ssize_t ret = qemu_paio_return(aiocb);
408 390

  
......
414 396
    return ret;
415 397
}
416 398

  
417
int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
399
static void posix_aio_read(void *opaque)
418 400
{
401
    PosixAioState *s = opaque;
402
    struct qemu_paiocb *acb, **pacb;
419 403
    int ret;
404
    ssize_t len;
405

  
406
    /* read all bytes from signal pipe */
407
    for (;;) {
408
        char bytes[16];
409

  
410
        len = read(s->rfd, bytes, sizeof(bytes));
411
        if (len == -1 && errno == EINTR)
412
            continue; /* try again */
413
        if (len == sizeof(bytes))
414
            continue; /* more to read */
415
        break;
416
    }
417

  
418
    for(;;) {
419
        pacb = &s->first_aio;
420
        for(;;) {
421
            acb = *pacb;
422
            if (!acb)
423
                goto the_end;
424
            ret = qemu_paio_error(acb);
425
            if (ret == ECANCELED) {
426
                /* remove the request */
427
                *pacb = acb->next;
428
                qemu_aio_release(acb);
429
            } else if (ret != EINPROGRESS) {
430
                /* end of aio */
431
                if (ret == 0) {
432
                    ret = qemu_paio_return(acb);
433
                    if (ret == acb->aio_nbytes)
434
                        ret = 0;
435
                    else
436
                        ret = -EINVAL;
437
                } else {
438
                    ret = -ret;
439
                }
440
                /* remove the request */
441
                *pacb = acb->next;
442
                /* call the callback */
443
                acb->common.cb(acb->common.opaque, ret);
444
                qemu_aio_release(acb);
445
                break;
446
            } else {
447
                pacb = &acb->next;
448
            }
449
        }
450
    }
451
 the_end: ;
452
}
453

  
454
static int posix_aio_flush(void *opaque)
455
{
456
    PosixAioState *s = opaque;
457
    return !!s->first_aio;
458
}
459

  
460
static PosixAioState *posix_aio_state;
461

  
462
static void aio_signal_handler(int signum)
463
{
464
    if (posix_aio_state) {
465
        char byte = 0;
466

  
467
        write(posix_aio_state->wfd, &byte, sizeof(byte));
468
    }
469

  
470
    qemu_service_io();
471
}
472

  
473
static void paio_remove(struct qemu_paiocb *acb)
474
{
475
    struct qemu_paiocb **pacb;
476

  
477
    /* remove the callback from the queue */
478
    pacb = &posix_aio_state->first_aio;
479
    for(;;) {
480
        if (*pacb == NULL) {
481
            fprintf(stderr, "paio_remove: aio request not found!\n");
482
            break;
483
        } else if (*pacb == acb) {
484
            *pacb = acb->next;
485
            qemu_aio_release(acb);
486
            break;
487
        }
488
        pacb = &(*pacb)->next;
489
    }
490
}
491

  
492
static void paio_cancel(BlockDriverAIOCB *blockacb)
493
{
494
    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
495
    int active = 0;
420 496

  
421 497
    mutex_lock(&lock);
422
    if (!aiocb->active) {
423
        TAILQ_REMOVE(&request_list, aiocb, node);
424
        aiocb->ret = -ECANCELED;
425
        ret = QEMU_PAIO_CANCELED;
426
    } else if (aiocb->ret == -EINPROGRESS)
427
        ret = QEMU_PAIO_NOTCANCELED;
428
    else
429
        ret = QEMU_PAIO_ALLDONE;
498
    if (!acb->active) {
499
        TAILQ_REMOVE(&request_list, acb, node);
500
        acb->ret = -ECANCELED;
501
    } else if (acb->ret == -EINPROGRESS) {
502
        active = 1;
503
    }
430 504
    mutex_unlock(&lock);
431 505

  
432
    return ret;
506
    if (active) {
507
        /* fail safe: if the aio could not be canceled, we wait for
508
           it */
509
        while (qemu_paio_error(acb) == EINPROGRESS)
510
            ;
511
    }
512

  
513
    paio_remove(acb);
514
}
515

  
516
static AIOPool raw_aio_pool = {
517
    .aiocb_size         = sizeof(struct qemu_paiocb),
518
    .cancel             = paio_cancel,
519
};
520

  
521
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
522
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
523
        BlockDriverCompletionFunc *cb, void *opaque, int type)
524
{
525
    struct qemu_paiocb *acb;
526

  
527
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
528
    if (!acb)
529
        return NULL;
530
    acb->aio_type = type;
531
    acb->aio_fildes = fd;
532
    acb->ev_signo = SIGUSR2;
533
    acb->aio_iov = qiov->iov;
534
    acb->aio_niov = qiov->niov;
535
    acb->aio_nbytes = nb_sectors * 512;
536
    acb->aio_offset = sector_num * 512;
537

  
538
    acb->next = posix_aio_state->first_aio;
539
    posix_aio_state->first_aio = acb;
540

  
541
    qemu_paio_submit(acb);
542
    return &acb->common;
543
}
544

  
545
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
546
        unsigned long int req, void *buf,
547
        BlockDriverCompletionFunc *cb, void *opaque)
548
{
549
    struct qemu_paiocb *acb;
550

  
551
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
552
    if (!acb)
553
        return NULL;
554
    acb->aio_type = QEMU_AIO_IOCTL;
555
    acb->aio_fildes = fd;
556
    acb->ev_signo = SIGUSR2;
557
    acb->aio_offset = 0;
558
    acb->aio_ioctl_buf = buf;
559
    acb->aio_ioctl_cmd = req;
560

  
561
    acb->next = posix_aio_state->first_aio;
562
    posix_aio_state->first_aio = acb;
563

  
564
    qemu_paio_submit(acb);
565
    return &acb->common;
566
}
567

  
568
void *paio_init(void)
569
{
570
    struct sigaction act;
571
    PosixAioState *s;
572
    int fds[2];
573
    int ret;
574

  
575
    if (posix_aio_state)
576
        return posix_aio_state;
577

  
578
    s = qemu_malloc(sizeof(PosixAioState));
579

  
580
    sigfillset(&act.sa_mask);
581
    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
582
    act.sa_handler = aio_signal_handler;
583
    sigaction(SIGUSR2, &act, NULL);
584

  
585
    s->first_aio = NULL;
586
    if (pipe(fds) == -1) {
587
        fprintf(stderr, "failed to create pipe\n");
588
        return NULL;
589
    }
590

  
591
    s->rfd = fds[0];
592
    s->wfd = fds[1];
593

  
594
    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
595
    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
596

  
597
    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
598

  
599
    ret = pthread_attr_init(&attr);
600
    if (ret)
601
        die2(ret, "pthread_attr_init");
602

  
603
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
604
    if (ret)
605
        die2(ret, "pthread_attr_setdetachstate");
606

  
607
    TAILQ_INIT(&request_list);
608

  
609
    posix_aio_state = s;
610

  
611
    return posix_aio_state;
433 612
}

Also available in: Unified diff