Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ 9bf0960a

History | View | Annotate | Download (15.9 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <sys/ioctl.h>
15
#include <sys/types.h>
16
#include <pthread.h>
17
#include <unistd.h>
18
#include <errno.h>
19
#include <time.h>
20
#include <string.h>
21
#include <stdlib.h>
22
#include <stdio.h>
23

    
24
#include "qemu-queue.h"
25
#include "osdep.h"
26
#include "sysemu.h"
27
#include "qemu-common.h"
28
#include "trace.h"
29
#include "block_int.h"
30

    
31
#include "block/raw-posix-aio.h"
32

    
33

    
34
struct qemu_paiocb {
35
    BlockDriverAIOCB common;
36
    int aio_fildes;
37
    union {
38
        struct iovec *aio_iov;
39
        void *aio_ioctl_buf;
40
    };
41
    int aio_niov;
42
    size_t aio_nbytes;
43
#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
44
    int ev_signo;
45
    off_t aio_offset;
46

    
47
    QTAILQ_ENTRY(qemu_paiocb) node;
48
    int aio_type;
49
    ssize_t ret;
50
    int active;
51
    struct qemu_paiocb *next;
52

    
53
    int async_context_id;
54
};
55

    
56
typedef struct PosixAioState {
57
    int rfd, wfd;
58
    struct qemu_paiocb *first_aio;
59
} PosixAioState;
60

    
61

    
62
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
63
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
64
static pthread_t thread_id;
65
static pthread_attr_t attr;
66
static int max_threads = 64;
67
static int cur_threads = 0;
68
static int idle_threads = 0;
69
static QTAILQ_HEAD(, qemu_paiocb) request_list;
70

    
71
#ifdef CONFIG_PREADV
72
static int preadv_present = 1;
73
#else
74
static int preadv_present = 0;
75
#endif
76

    
77
static void die2(int err, const char *what)
78
{
79
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
80
    abort();
81
}
82

    
83
static void die(const char *what)
84
{
85
    die2(errno, what);
86
}
87

    
88
static void mutex_lock(pthread_mutex_t *mutex)
89
{
90
    int ret = pthread_mutex_lock(mutex);
91
    if (ret) die2(ret, "pthread_mutex_lock");
92
}
93

    
94
static void mutex_unlock(pthread_mutex_t *mutex)
95
{
96
    int ret = pthread_mutex_unlock(mutex);
97
    if (ret) die2(ret, "pthread_mutex_unlock");
98
}
99

    
100
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
101
                           struct timespec *ts)
102
{
103
    int ret = pthread_cond_timedwait(cond, mutex, ts);
104
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
105
    return ret;
106
}
107

    
108
static void cond_signal(pthread_cond_t *cond)
109
{
110
    int ret = pthread_cond_signal(cond);
111
    if (ret) die2(ret, "pthread_cond_signal");
112
}
113

    
114
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
115
                          void *(*start_routine)(void*), void *arg)
116
{
117
    int ret = pthread_create(thread, attr, start_routine, arg);
118
    if (ret) die2(ret, "pthread_create");
119
}
120

    
121
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
122
{
123
    int ret;
124

    
125
    ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
126
    if (ret == -1)
127
        return -errno;
128

    
129
    /*
130
     * This looks weird, but the aio code only consideres a request
131
     * successful if it has written the number full number of bytes.
132
     *
133
     * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
134
     * so in fact we return the ioctl command here to make posix_aio_read()
135
     * happy..
136
     */
137
    return aiocb->aio_nbytes;
138
}
139

    
140
static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
141
{
142
    int ret;
143

    
144
    ret = qemu_fdatasync(aiocb->aio_fildes);
145
    if (ret == -1)
146
        return -errno;
147
    return 0;
148
}
149

    
150
#ifdef CONFIG_PREADV
151

    
152
static ssize_t
153
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
154
{
155
    return preadv(fd, iov, nr_iov, offset);
156
}
157

    
158
static ssize_t
159
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
160
{
161
    return pwritev(fd, iov, nr_iov, offset);
162
}
163

    
164
#else
165

    
166
static ssize_t
167
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
168
{
169
    return -ENOSYS;
170
}
171

    
172
static ssize_t
173
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
174
{
175
    return -ENOSYS;
176
}
177

    
178
#endif
179

    
180
static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
181
{
182
    size_t offset = 0;
183
    ssize_t len;
184

    
185
    do {
186
        if (aiocb->aio_type & QEMU_AIO_WRITE)
187
            len = qemu_pwritev(aiocb->aio_fildes,
188
                               aiocb->aio_iov,
189
                               aiocb->aio_niov,
190
                               aiocb->aio_offset + offset);
191
         else
192
            len = qemu_preadv(aiocb->aio_fildes,
193
                              aiocb->aio_iov,
194
                              aiocb->aio_niov,
195
                              aiocb->aio_offset + offset);
196
    } while (len == -1 && errno == EINTR);
197

    
198
    if (len == -1)
199
        return -errno;
200
    return len;
201
}
202

    
203
static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
204
{
205
    ssize_t offset = 0;
206
    ssize_t len;
207

    
208
    while (offset < aiocb->aio_nbytes) {
209
         if (aiocb->aio_type & QEMU_AIO_WRITE)
210
             len = pwrite(aiocb->aio_fildes,
211
                          (const char *)buf + offset,
212
                          aiocb->aio_nbytes - offset,
213
                          aiocb->aio_offset + offset);
214
         else
215
             len = pread(aiocb->aio_fildes,
216
                         buf + offset,
217
                         aiocb->aio_nbytes - offset,
218
                         aiocb->aio_offset + offset);
219

    
220
         if (len == -1 && errno == EINTR)
221
             continue;
222
         else if (len == -1) {
223
             offset = -errno;
224
             break;
225
         } else if (len == 0)
226
             break;
227

    
228
         offset += len;
229
    }
230

    
231
    return offset;
232
}
233

    
234
static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
235
{
236
    ssize_t nbytes;
237
    char *buf;
238

    
239
    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
240
        /*
241
         * If there is just a single buffer, and it is properly aligned
242
         * we can just use plain pread/pwrite without any problems.
243
         */
244
        if (aiocb->aio_niov == 1)
245
             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
246

    
247
        /*
248
         * We have more than one iovec, and all are properly aligned.
249
         *
250
         * Try preadv/pwritev first and fall back to linearizing the
251
         * buffer if it's not supported.
252
         */
253
        if (preadv_present) {
254
            nbytes = handle_aiocb_rw_vector(aiocb);
255
            if (nbytes == aiocb->aio_nbytes)
256
                return nbytes;
257
            if (nbytes < 0 && nbytes != -ENOSYS)
258
                return nbytes;
259
            preadv_present = 0;
260
        }
261

    
262
        /*
263
         * XXX(hch): short read/write.  no easy way to handle the reminder
264
         * using these interfaces.  For now retry using plain
265
         * pread/pwrite?
266
         */
267
    }
268

    
269
    /*
270
     * Ok, we have to do it the hard way, copy all segments into
271
     * a single aligned buffer.
272
     */
273
    buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
274
    if (aiocb->aio_type & QEMU_AIO_WRITE) {
275
        char *p = buf;
276
        int i;
277

    
278
        for (i = 0; i < aiocb->aio_niov; ++i) {
279
            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
280
            p += aiocb->aio_iov[i].iov_len;
281
        }
282
    }
283

    
284
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
285
    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
286
        char *p = buf;
287
        size_t count = aiocb->aio_nbytes, copy;
288
        int i;
289

    
290
        for (i = 0; i < aiocb->aio_niov && count; ++i) {
291
            copy = count;
292
            if (copy > aiocb->aio_iov[i].iov_len)
293
                copy = aiocb->aio_iov[i].iov_len;
294
            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
295
            p     += copy;
296
            count -= copy;
297
        }
298
    }
299
    qemu_vfree(buf);
300

    
301
    return nbytes;
302
}
303

    
304
static void *aio_thread(void *unused)
305
{
306
    pid_t pid;
307

    
308
    pid = getpid();
309

    
310
    while (1) {
311
        struct qemu_paiocb *aiocb;
312
        ssize_t ret = 0;
313
        qemu_timeval tv;
314
        struct timespec ts;
315

    
316
        qemu_gettimeofday(&tv);
317
        ts.tv_sec = tv.tv_sec + 10;
318
        ts.tv_nsec = 0;
319

    
320
        mutex_lock(&lock);
321

    
322
        while (QTAILQ_EMPTY(&request_list) &&
323
               !(ret == ETIMEDOUT)) {
324
            idle_threads++;
325
            ret = cond_timedwait(&cond, &lock, &ts);
326
            idle_threads--;
327
        }
328

    
329
        if (QTAILQ_EMPTY(&request_list))
330
            break;
331

    
332
        aiocb = QTAILQ_FIRST(&request_list);
333
        QTAILQ_REMOVE(&request_list, aiocb, node);
334
        aiocb->active = 1;
335
        mutex_unlock(&lock);
336

    
337
        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
338
        case QEMU_AIO_READ:
339
        case QEMU_AIO_WRITE:
340
            ret = handle_aiocb_rw(aiocb);
341
            break;
342
        case QEMU_AIO_FLUSH:
343
            ret = handle_aiocb_flush(aiocb);
344
            break;
345
        case QEMU_AIO_IOCTL:
346
            ret = handle_aiocb_ioctl(aiocb);
347
            break;
348
        default:
349
            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
350
            ret = -EINVAL;
351
            break;
352
        }
353

    
354
        mutex_lock(&lock);
355
        aiocb->ret = ret;
356
        mutex_unlock(&lock);
357

    
358
        if (kill(pid, aiocb->ev_signo)) die("kill failed");
359
    }
360

    
361
    cur_threads--;
362
    mutex_unlock(&lock);
363

    
364
    return NULL;
365
}
366

    
367
static void spawn_thread(void)
368
{
369
    sigset_t set, oldset;
370

    
371
    cur_threads++;
372

    
373
    /* block all signals */
374
    if (sigfillset(&set)) die("sigfillset");
375
    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
376

    
377
    thread_create(&thread_id, &attr, aio_thread, NULL);
378

    
379
    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
380
}
381

    
382
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
383
{
384
    aiocb->ret = -EINPROGRESS;
385
    aiocb->active = 0;
386
    mutex_lock(&lock);
387
    if (idle_threads == 0 && cur_threads < max_threads)
388
        spawn_thread();
389
    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
390
    mutex_unlock(&lock);
391
    cond_signal(&cond);
392
}
393

    
394
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
395
{
396
    ssize_t ret;
397

    
398
    mutex_lock(&lock);
399
    ret = aiocb->ret;
400
    mutex_unlock(&lock);
401

    
402
    return ret;
403
}
404

    
405
static int qemu_paio_error(struct qemu_paiocb *aiocb)
406
{
407
    ssize_t ret = qemu_paio_return(aiocb);
408

    
409
    if (ret < 0)
410
        ret = -ret;
411
    else
412
        ret = 0;
413

    
414
    return ret;
415
}
416

    
417
static int posix_aio_process_queue(void *opaque)
418
{
419
    PosixAioState *s = opaque;
420
    struct qemu_paiocb *acb, **pacb;
421
    int ret;
422
    int result = 0;
423
    int async_context_id = get_async_context_id();
424

    
425
    for(;;) {
426
        pacb = &s->first_aio;
427
        for(;;) {
428
            acb = *pacb;
429
            if (!acb)
430
                return result;
431

    
432
            /* we're only interested in requests in the right context */
433
            if (acb->async_context_id != async_context_id) {
434
                pacb = &acb->next;
435
                continue;
436
            }
437

    
438
            ret = qemu_paio_error(acb);
439
            if (ret == ECANCELED) {
440
                /* remove the request */
441
                *pacb = acb->next;
442
                qemu_aio_release(acb);
443
                result = 1;
444
            } else if (ret != EINPROGRESS) {
445
                /* end of aio */
446
                if (ret == 0) {
447
                    ret = qemu_paio_return(acb);
448
                    if (ret == acb->aio_nbytes)
449
                        ret = 0;
450
                    else
451
                        ret = -EINVAL;
452
                } else {
453
                    ret = -ret;
454
                }
455

    
456
                trace_paio_complete(acb, acb->common.opaque, ret);
457

    
458
                /* remove the request */
459
                *pacb = acb->next;
460
                /* call the callback */
461
                acb->common.cb(acb->common.opaque, ret);
462
                qemu_aio_release(acb);
463
                result = 1;
464
                break;
465
            } else {
466
                pacb = &acb->next;
467
            }
468
        }
469
    }
470

    
471
    return result;
472
}
473

    
474
static void posix_aio_read(void *opaque)
475
{
476
    PosixAioState *s = opaque;
477
    ssize_t len;
478

    
479
    /* read all bytes from signal pipe */
480
    for (;;) {
481
        char bytes[16];
482

    
483
        len = read(s->rfd, bytes, sizeof(bytes));
484
        if (len == -1 && errno == EINTR)
485
            continue; /* try again */
486
        if (len == sizeof(bytes))
487
            continue; /* more to read */
488
        break;
489
    }
490

    
491
    posix_aio_process_queue(s);
492
}
493

    
494
static int posix_aio_flush(void *opaque)
495
{
496
    PosixAioState *s = opaque;
497
    return !!s->first_aio;
498
}
499

    
500
static PosixAioState *posix_aio_state;
501

    
502
static void aio_signal_handler(int signum)
503
{
504
    if (posix_aio_state) {
505
        char byte = 0;
506
        ssize_t ret;
507

    
508
        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
509
        if (ret < 0 && errno != EAGAIN)
510
            die("write()");
511
    }
512

    
513
    qemu_service_io();
514
}
515

    
516
static void paio_remove(struct qemu_paiocb *acb)
517
{
518
    struct qemu_paiocb **pacb;
519

    
520
    /* remove the callback from the queue */
521
    pacb = &posix_aio_state->first_aio;
522
    for(;;) {
523
        if (*pacb == NULL) {
524
            fprintf(stderr, "paio_remove: aio request not found!\n");
525
            break;
526
        } else if (*pacb == acb) {
527
            *pacb = acb->next;
528
            qemu_aio_release(acb);
529
            break;
530
        }
531
        pacb = &(*pacb)->next;
532
    }
533
}
534

    
535
static void paio_cancel(BlockDriverAIOCB *blockacb)
536
{
537
    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
538
    int active = 0;
539

    
540
    trace_paio_cancel(acb, acb->common.opaque);
541

    
542
    mutex_lock(&lock);
543
    if (!acb->active) {
544
        QTAILQ_REMOVE(&request_list, acb, node);
545
        acb->ret = -ECANCELED;
546
    } else if (acb->ret == -EINPROGRESS) {
547
        active = 1;
548
    }
549
    mutex_unlock(&lock);
550

    
551
    if (active) {
552
        /* fail safe: if the aio could not be canceled, we wait for
553
           it */
554
        while (qemu_paio_error(acb) == EINPROGRESS)
555
            ;
556
    }
557

    
558
    paio_remove(acb);
559
}
560

    
561
static AIOPool raw_aio_pool = {
562
    .aiocb_size         = sizeof(struct qemu_paiocb),
563
    .cancel             = paio_cancel,
564
};
565

    
566
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
567
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
568
        BlockDriverCompletionFunc *cb, void *opaque, int type)
569
{
570
    struct qemu_paiocb *acb;
571

    
572
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
573
    if (!acb)
574
        return NULL;
575
    acb->aio_type = type;
576
    acb->aio_fildes = fd;
577
    acb->ev_signo = SIGUSR2;
578
    acb->async_context_id = get_async_context_id();
579

    
580
    if (qiov) {
581
        acb->aio_iov = qiov->iov;
582
        acb->aio_niov = qiov->niov;
583
    }
584
    acb->aio_nbytes = nb_sectors * 512;
585
    acb->aio_offset = sector_num * 512;
586

    
587
    acb->next = posix_aio_state->first_aio;
588
    posix_aio_state->first_aio = acb;
589

    
590
    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
591
    qemu_paio_submit(acb);
592
    return &acb->common;
593
}
594

    
595
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
596
        unsigned long int req, void *buf,
597
        BlockDriverCompletionFunc *cb, void *opaque)
598
{
599
    struct qemu_paiocb *acb;
600

    
601
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
602
    if (!acb)
603
        return NULL;
604
    acb->aio_type = QEMU_AIO_IOCTL;
605
    acb->aio_fildes = fd;
606
    acb->ev_signo = SIGUSR2;
607
    acb->async_context_id = get_async_context_id();
608
    acb->aio_offset = 0;
609
    acb->aio_ioctl_buf = buf;
610
    acb->aio_ioctl_cmd = req;
611

    
612
    acb->next = posix_aio_state->first_aio;
613
    posix_aio_state->first_aio = acb;
614

    
615
    qemu_paio_submit(acb);
616
    return &acb->common;
617
}
618

    
619
int paio_init(void)
620
{
621
    struct sigaction act;
622
    PosixAioState *s;
623
    int fds[2];
624
    int ret;
625

    
626
    if (posix_aio_state)
627
        return 0;
628

    
629
    s = qemu_malloc(sizeof(PosixAioState));
630

    
631
    sigfillset(&act.sa_mask);
632
    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
633
    act.sa_handler = aio_signal_handler;
634
    sigaction(SIGUSR2, &act, NULL);
635

    
636
    s->first_aio = NULL;
637
    if (qemu_pipe(fds) == -1) {
638
        fprintf(stderr, "failed to create pipe\n");
639
        return -1;
640
    }
641

    
642
    s->rfd = fds[0];
643
    s->wfd = fds[1];
644

    
645
    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
646
    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
647

    
648
    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
649
        posix_aio_process_queue, s);
650

    
651
    ret = pthread_attr_init(&attr);
652
    if (ret)
653
        die2(ret, "pthread_attr_init");
654

    
655
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
656
    if (ret)
657
        die2(ret, "pthread_attr_setdetachstate");
658

    
659
    QTAILQ_INIT(&request_list);
660

    
661
    posix_aio_state = s;
662
    return 0;
663
}