Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ dc786bc9

History | View | Annotate | Download (15.8 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <sys/ioctl.h>
15
#include <sys/types.h>
16
#include <pthread.h>
17
#include <unistd.h>
18
#include <errno.h>
19
#include <time.h>
20
#include <signal.h>
21
#include <string.h>
22
#include <stdlib.h>
23
#include <stdio.h>
24

    
25
#include "qemu-queue.h"
26
#include "osdep.h"
27
#include "sysemu.h"
28
#include "qemu-common.h"
29
#include "trace.h"
30
#include "block_int.h"
31

    
32
#include "block/raw-posix-aio.h"
33

    
34

    
35
struct qemu_paiocb {
36
    BlockDriverAIOCB common;
37
    int aio_fildes;
38
    union {
39
        struct iovec *aio_iov;
40
        void *aio_ioctl_buf;
41
    };
42
    int aio_niov;
43
    size_t aio_nbytes;
44
#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
45
    int ev_signo;
46
    off_t aio_offset;
47

    
48
    QTAILQ_ENTRY(qemu_paiocb) node;
49
    int aio_type;
50
    ssize_t ret;
51
    int active;
52
    struct qemu_paiocb *next;
53

    
54
    int async_context_id;
55
};
56

    
57
typedef struct PosixAioState {
58
    int rfd, wfd;
59
    struct qemu_paiocb *first_aio;
60
} PosixAioState;
61

    
62

    
63
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
64
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
65
static pthread_t thread_id;
66
static pthread_attr_t attr;
67
static int max_threads = 64;
68
static int cur_threads = 0;
69
static int idle_threads = 0;
70
static QTAILQ_HEAD(, qemu_paiocb) request_list;
71

    
72
#ifdef CONFIG_PREADV
73
static int preadv_present = 1;
74
#else
75
static int preadv_present = 0;
76
#endif
77

    
78
static void die2(int err, const char *what)
79
{
80
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
81
    abort();
82
}
83

    
84
static void die(const char *what)
85
{
86
    die2(errno, what);
87
}
88

    
89
static void mutex_lock(pthread_mutex_t *mutex)
90
{
91
    int ret = pthread_mutex_lock(mutex);
92
    if (ret) die2(ret, "pthread_mutex_lock");
93
}
94

    
95
static void mutex_unlock(pthread_mutex_t *mutex)
96
{
97
    int ret = pthread_mutex_unlock(mutex);
98
    if (ret) die2(ret, "pthread_mutex_unlock");
99
}
100

    
101
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
102
                           struct timespec *ts)
103
{
104
    int ret = pthread_cond_timedwait(cond, mutex, ts);
105
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
106
    return ret;
107
}
108

    
109
static void cond_signal(pthread_cond_t *cond)
110
{
111
    int ret = pthread_cond_signal(cond);
112
    if (ret) die2(ret, "pthread_cond_signal");
113
}
114

    
115
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
116
                          void *(*start_routine)(void*), void *arg)
117
{
118
    int ret = pthread_create(thread, attr, start_routine, arg);
119
    if (ret) die2(ret, "pthread_create");
120
}
121

    
122
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
123
{
124
    int ret;
125

    
126
    ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
127
    if (ret == -1)
128
        return -errno;
129

    
130
    /*
131
     * This looks weird, but the aio code only consideres a request
132
     * successful if it has written the number full number of bytes.
133
     *
134
     * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
135
     * so in fact we return the ioctl command here to make posix_aio_read()
136
     * happy..
137
     */
138
    return aiocb->aio_nbytes;
139
}
140

    
141
static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
142
{
143
    int ret;
144

    
145
    ret = qemu_fdatasync(aiocb->aio_fildes);
146
    if (ret == -1)
147
        return -errno;
148
    return 0;
149
}
150

    
151
#ifdef CONFIG_PREADV
152

    
153
static ssize_t
154
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
155
{
156
    return preadv(fd, iov, nr_iov, offset);
157
}
158

    
159
static ssize_t
160
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
161
{
162
    return pwritev(fd, iov, nr_iov, offset);
163
}
164

    
165
#else
166

    
167
static ssize_t
168
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
169
{
170
    return -ENOSYS;
171
}
172

    
173
static ssize_t
174
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
175
{
176
    return -ENOSYS;
177
}
178

    
179
#endif
180

    
181
static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
182
{
183
    size_t offset = 0;
184
    ssize_t len;
185

    
186
    do {
187
        if (aiocb->aio_type & QEMU_AIO_WRITE)
188
            len = qemu_pwritev(aiocb->aio_fildes,
189
                               aiocb->aio_iov,
190
                               aiocb->aio_niov,
191
                               aiocb->aio_offset + offset);
192
         else
193
            len = qemu_preadv(aiocb->aio_fildes,
194
                              aiocb->aio_iov,
195
                              aiocb->aio_niov,
196
                              aiocb->aio_offset + offset);
197
    } while (len == -1 && errno == EINTR);
198

    
199
    if (len == -1)
200
        return -errno;
201
    return len;
202
}
203

    
204
static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
205
{
206
    ssize_t offset = 0;
207
    ssize_t len;
208

    
209
    while (offset < aiocb->aio_nbytes) {
210
         if (aiocb->aio_type & QEMU_AIO_WRITE)
211
             len = pwrite(aiocb->aio_fildes,
212
                          (const char *)buf + offset,
213
                          aiocb->aio_nbytes - offset,
214
                          aiocb->aio_offset + offset);
215
         else
216
             len = pread(aiocb->aio_fildes,
217
                         buf + offset,
218
                         aiocb->aio_nbytes - offset,
219
                         aiocb->aio_offset + offset);
220

    
221
         if (len == -1 && errno == EINTR)
222
             continue;
223
         else if (len == -1) {
224
             offset = -errno;
225
             break;
226
         } else if (len == 0)
227
             break;
228

    
229
         offset += len;
230
    }
231

    
232
    return offset;
233
}
234

    
235
static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
236
{
237
    ssize_t nbytes;
238
    char *buf;
239

    
240
    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
241
        /*
242
         * If there is just a single buffer, and it is properly aligned
243
         * we can just use plain pread/pwrite without any problems.
244
         */
245
        if (aiocb->aio_niov == 1)
246
             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
247

    
248
        /*
249
         * We have more than one iovec, and all are properly aligned.
250
         *
251
         * Try preadv/pwritev first and fall back to linearizing the
252
         * buffer if it's not supported.
253
         */
254
        if (preadv_present) {
255
            nbytes = handle_aiocb_rw_vector(aiocb);
256
            if (nbytes == aiocb->aio_nbytes)
257
                return nbytes;
258
            if (nbytes < 0 && nbytes != -ENOSYS)
259
                return nbytes;
260
            preadv_present = 0;
261
        }
262

    
263
        /*
264
         * XXX(hch): short read/write.  no easy way to handle the reminder
265
         * using these interfaces.  For now retry using plain
266
         * pread/pwrite?
267
         */
268
    }
269

    
270
    /*
271
     * Ok, we have to do it the hard way, copy all segments into
272
     * a single aligned buffer.
273
     */
274
    buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
275
    if (aiocb->aio_type & QEMU_AIO_WRITE) {
276
        char *p = buf;
277
        int i;
278

    
279
        for (i = 0; i < aiocb->aio_niov; ++i) {
280
            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
281
            p += aiocb->aio_iov[i].iov_len;
282
        }
283
    }
284

    
285
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
286
    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
287
        char *p = buf;
288
        size_t count = aiocb->aio_nbytes, copy;
289
        int i;
290

    
291
        for (i = 0; i < aiocb->aio_niov && count; ++i) {
292
            copy = count;
293
            if (copy > aiocb->aio_iov[i].iov_len)
294
                copy = aiocb->aio_iov[i].iov_len;
295
            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
296
            p     += copy;
297
            count -= copy;
298
        }
299
    }
300
    qemu_vfree(buf);
301

    
302
    return nbytes;
303
}
304

    
305
static void *aio_thread(void *unused)
306
{
307
    pid_t pid;
308

    
309
    pid = getpid();
310

    
311
    while (1) {
312
        struct qemu_paiocb *aiocb;
313
        ssize_t ret = 0;
314
        qemu_timeval tv;
315
        struct timespec ts;
316

    
317
        qemu_gettimeofday(&tv);
318
        ts.tv_sec = tv.tv_sec + 10;
319
        ts.tv_nsec = 0;
320

    
321
        mutex_lock(&lock);
322

    
323
        while (QTAILQ_EMPTY(&request_list) &&
324
               !(ret == ETIMEDOUT)) {
325
            ret = cond_timedwait(&cond, &lock, &ts);
326
        }
327

    
328
        if (QTAILQ_EMPTY(&request_list))
329
            break;
330

    
331
        aiocb = QTAILQ_FIRST(&request_list);
332
        QTAILQ_REMOVE(&request_list, aiocb, node);
333
        aiocb->active = 1;
334
        idle_threads--;
335
        mutex_unlock(&lock);
336

    
337
        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
338
        case QEMU_AIO_READ:
339
        case QEMU_AIO_WRITE:
340
            ret = handle_aiocb_rw(aiocb);
341
            break;
342
        case QEMU_AIO_FLUSH:
343
            ret = handle_aiocb_flush(aiocb);
344
            break;
345
        case QEMU_AIO_IOCTL:
346
            ret = handle_aiocb_ioctl(aiocb);
347
            break;
348
        default:
349
            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
350
            ret = -EINVAL;
351
            break;
352
        }
353

    
354
        mutex_lock(&lock);
355
        aiocb->ret = ret;
356
        idle_threads++;
357
        mutex_unlock(&lock);
358

    
359
        if (kill(pid, aiocb->ev_signo)) die("kill failed");
360
    }
361

    
362
    idle_threads--;
363
    cur_threads--;
364
    mutex_unlock(&lock);
365

    
366
    return NULL;
367
}
368

    
369
static void spawn_thread(void)
370
{
371
    sigset_t set, oldset;
372

    
373
    cur_threads++;
374
    idle_threads++;
375

    
376
    /* block all signals */
377
    if (sigfillset(&set)) die("sigfillset");
378
    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
379

    
380
    thread_create(&thread_id, &attr, aio_thread, NULL);
381

    
382
    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
383
}
384

    
385
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
386
{
387
    aiocb->ret = -EINPROGRESS;
388
    aiocb->active = 0;
389
    mutex_lock(&lock);
390
    if (idle_threads == 0 && cur_threads < max_threads)
391
        spawn_thread();
392
    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
393
    mutex_unlock(&lock);
394
    cond_signal(&cond);
395
}
396

    
397
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
398
{
399
    ssize_t ret;
400

    
401
    mutex_lock(&lock);
402
    ret = aiocb->ret;
403
    mutex_unlock(&lock);
404

    
405
    return ret;
406
}
407

    
408
static int qemu_paio_error(struct qemu_paiocb *aiocb)
409
{
410
    ssize_t ret = qemu_paio_return(aiocb);
411

    
412
    if (ret < 0)
413
        ret = -ret;
414
    else
415
        ret = 0;
416

    
417
    return ret;
418
}
419

    
420
static int posix_aio_process_queue(void *opaque)
421
{
422
    PosixAioState *s = opaque;
423
    struct qemu_paiocb *acb, **pacb;
424
    int ret;
425
    int result = 0;
426
    int async_context_id = get_async_context_id();
427

    
428
    for(;;) {
429
        pacb = &s->first_aio;
430
        for(;;) {
431
            acb = *pacb;
432
            if (!acb)
433
                return result;
434

    
435
            /* we're only interested in requests in the right context */
436
            if (acb->async_context_id != async_context_id) {
437
                pacb = &acb->next;
438
                continue;
439
            }
440

    
441
            ret = qemu_paio_error(acb);
442
            if (ret == ECANCELED) {
443
                /* remove the request */
444
                *pacb = acb->next;
445
                qemu_aio_release(acb);
446
                result = 1;
447
            } else if (ret != EINPROGRESS) {
448
                /* end of aio */
449
                if (ret == 0) {
450
                    ret = qemu_paio_return(acb);
451
                    if (ret == acb->aio_nbytes)
452
                        ret = 0;
453
                    else
454
                        ret = -EINVAL;
455
                } else {
456
                    ret = -ret;
457
                }
458
                /* remove the request */
459
                *pacb = acb->next;
460
                /* call the callback */
461
                acb->common.cb(acb->common.opaque, ret);
462
                qemu_aio_release(acb);
463
                result = 1;
464
                break;
465
            } else {
466
                pacb = &acb->next;
467
            }
468
        }
469
    }
470

    
471
    return result;
472
}
473

    
474
static void posix_aio_read(void *opaque)
475
{
476
    PosixAioState *s = opaque;
477
    ssize_t len;
478

    
479
    /* read all bytes from signal pipe */
480
    for (;;) {
481
        char bytes[16];
482

    
483
        len = read(s->rfd, bytes, sizeof(bytes));
484
        if (len == -1 && errno == EINTR)
485
            continue; /* try again */
486
        if (len == sizeof(bytes))
487
            continue; /* more to read */
488
        break;
489
    }
490

    
491
    posix_aio_process_queue(s);
492
}
493

    
494
static int posix_aio_flush(void *opaque)
495
{
496
    PosixAioState *s = opaque;
497
    return !!s->first_aio;
498
}
499

    
500
static PosixAioState *posix_aio_state;
501

    
502
static void aio_signal_handler(int signum)
503
{
504
    if (posix_aio_state) {
505
        char byte = 0;
506
        ssize_t ret;
507

    
508
        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
509
        if (ret < 0 && errno != EAGAIN)
510
            die("write()");
511
    }
512

    
513
    qemu_service_io();
514
}
515

    
516
static void paio_remove(struct qemu_paiocb *acb)
517
{
518
    struct qemu_paiocb **pacb;
519

    
520
    /* remove the callback from the queue */
521
    pacb = &posix_aio_state->first_aio;
522
    for(;;) {
523
        if (*pacb == NULL) {
524
            fprintf(stderr, "paio_remove: aio request not found!\n");
525
            break;
526
        } else if (*pacb == acb) {
527
            *pacb = acb->next;
528
            qemu_aio_release(acb);
529
            break;
530
        }
531
        pacb = &(*pacb)->next;
532
    }
533
}
534

    
535
static void paio_cancel(BlockDriverAIOCB *blockacb)
536
{
537
    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
538
    int active = 0;
539

    
540
    mutex_lock(&lock);
541
    if (!acb->active) {
542
        QTAILQ_REMOVE(&request_list, acb, node);
543
        acb->ret = -ECANCELED;
544
    } else if (acb->ret == -EINPROGRESS) {
545
        active = 1;
546
    }
547
    mutex_unlock(&lock);
548

    
549
    if (active) {
550
        /* fail safe: if the aio could not be canceled, we wait for
551
           it */
552
        while (qemu_paio_error(acb) == EINPROGRESS)
553
            ;
554
    }
555

    
556
    paio_remove(acb);
557
}
558

    
559
static AIOPool raw_aio_pool = {
560
    .aiocb_size         = sizeof(struct qemu_paiocb),
561
    .cancel             = paio_cancel,
562
};
563

    
564
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
565
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
566
        BlockDriverCompletionFunc *cb, void *opaque, int type)
567
{
568
    struct qemu_paiocb *acb;
569

    
570
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
571
    if (!acb)
572
        return NULL;
573
    acb->aio_type = type;
574
    acb->aio_fildes = fd;
575
    acb->ev_signo = SIGUSR2;
576
    acb->async_context_id = get_async_context_id();
577

    
578
    if (qiov) {
579
        acb->aio_iov = qiov->iov;
580
        acb->aio_niov = qiov->niov;
581
    }
582
    acb->aio_nbytes = nb_sectors * 512;
583
    acb->aio_offset = sector_num * 512;
584

    
585
    acb->next = posix_aio_state->first_aio;
586
    posix_aio_state->first_aio = acb;
587

    
588
    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
589
    qemu_paio_submit(acb);
590
    return &acb->common;
591
}
592

    
593
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
594
        unsigned long int req, void *buf,
595
        BlockDriverCompletionFunc *cb, void *opaque)
596
{
597
    struct qemu_paiocb *acb;
598

    
599
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
600
    if (!acb)
601
        return NULL;
602
    acb->aio_type = QEMU_AIO_IOCTL;
603
    acb->aio_fildes = fd;
604
    acb->ev_signo = SIGUSR2;
605
    acb->async_context_id = get_async_context_id();
606
    acb->aio_offset = 0;
607
    acb->aio_ioctl_buf = buf;
608
    acb->aio_ioctl_cmd = req;
609

    
610
    acb->next = posix_aio_state->first_aio;
611
    posix_aio_state->first_aio = acb;
612

    
613
    qemu_paio_submit(acb);
614
    return &acb->common;
615
}
616

    
617
int paio_init(void)
618
{
619
    struct sigaction act;
620
    PosixAioState *s;
621
    int fds[2];
622
    int ret;
623

    
624
    if (posix_aio_state)
625
        return 0;
626

    
627
    s = qemu_malloc(sizeof(PosixAioState));
628

    
629
    sigfillset(&act.sa_mask);
630
    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
631
    act.sa_handler = aio_signal_handler;
632
    sigaction(SIGUSR2, &act, NULL);
633

    
634
    s->first_aio = NULL;
635
    if (qemu_pipe(fds) == -1) {
636
        fprintf(stderr, "failed to create pipe\n");
637
        return -1;
638
    }
639

    
640
    s->rfd = fds[0];
641
    s->wfd = fds[1];
642

    
643
    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
644
    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
645

    
646
    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
647
        posix_aio_process_queue, s);
648

    
649
    ret = pthread_attr_init(&attr);
650
    if (ret)
651
        die2(ret, "pthread_attr_init");
652

    
653
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
654
    if (ret)
655
        die2(ret, "pthread_attr_setdetachstate");
656

    
657
    QTAILQ_INIT(&request_list);
658

    
659
    posix_aio_state = s;
660
    return 0;
661
}