Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ a74cdab4

History | View | Annotate | Download (15.9 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <sys/ioctl.h>
15
#include <sys/types.h>
16
#include <pthread.h>
17
#include <unistd.h>
18
#include <errno.h>
19
#include <time.h>
20
#include <signal.h>
21
#include <string.h>
22
#include <stdlib.h>
23
#include <stdio.h>
24

    
25
#include "qemu-queue.h"
26
#include "osdep.h"
27
#include "sysemu.h"
28
#include "qemu-common.h"
29
#include "trace.h"
30
#include "block_int.h"
31

    
32
#include "block/raw-posix-aio.h"
33

    
34

    
35
struct qemu_paiocb {
36
    BlockDriverAIOCB common;
37
    int aio_fildes;
38
    union {
39
        struct iovec *aio_iov;
40
        void *aio_ioctl_buf;
41
    };
42
    int aio_niov;
43
    size_t aio_nbytes;
44
#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
45
    int ev_signo;
46
    off_t aio_offset;
47

    
48
    QTAILQ_ENTRY(qemu_paiocb) node;
49
    int aio_type;
50
    ssize_t ret;
51
    int active;
52
    struct qemu_paiocb *next;
53

    
54
    int async_context_id;
55
};
56

    
57
typedef struct PosixAioState {
58
    int rfd, wfd;
59
    struct qemu_paiocb *first_aio;
60
} PosixAioState;
61

    
62

    
63
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
64
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
65
static pthread_t thread_id;
66
static pthread_attr_t attr;
67
static int max_threads = 64;
68
static int cur_threads = 0;
69
static int idle_threads = 0;
70
static QTAILQ_HEAD(, qemu_paiocb) request_list;
71

    
72
#ifdef CONFIG_PREADV
73
static int preadv_present = 1;
74
#else
75
static int preadv_present = 0;
76
#endif
77

    
78
static void die2(int err, const char *what)
79
{
80
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
81
    abort();
82
}
83

    
84
static void die(const char *what)
85
{
86
    die2(errno, what);
87
}
88

    
89
static void mutex_lock(pthread_mutex_t *mutex)
90
{
91
    int ret = pthread_mutex_lock(mutex);
92
    if (ret) die2(ret, "pthread_mutex_lock");
93
}
94

    
95
static void mutex_unlock(pthread_mutex_t *mutex)
96
{
97
    int ret = pthread_mutex_unlock(mutex);
98
    if (ret) die2(ret, "pthread_mutex_unlock");
99
}
100

    
101
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
102
                           struct timespec *ts)
103
{
104
    int ret = pthread_cond_timedwait(cond, mutex, ts);
105
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
106
    return ret;
107
}
108

    
109
static void cond_signal(pthread_cond_t *cond)
110
{
111
    int ret = pthread_cond_signal(cond);
112
    if (ret) die2(ret, "pthread_cond_signal");
113
}
114

    
115
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
116
                          void *(*start_routine)(void*), void *arg)
117
{
118
    int ret = pthread_create(thread, attr, start_routine, arg);
119
    if (ret) die2(ret, "pthread_create");
120
}
121

    
122
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
123
{
124
    int ret;
125

    
126
    ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
127
    if (ret == -1)
128
        return -errno;
129

    
130
    /*
131
     * This looks weird, but the aio code only consideres a request
132
     * successful if it has written the number full number of bytes.
133
     *
134
     * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
135
     * so in fact we return the ioctl command here to make posix_aio_read()
136
     * happy..
137
     */
138
    return aiocb->aio_nbytes;
139
}
140

    
141
static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
142
{
143
    int ret;
144

    
145
    ret = qemu_fdatasync(aiocb->aio_fildes);
146
    if (ret == -1)
147
        return -errno;
148
    return 0;
149
}
150

    
151
#ifdef CONFIG_PREADV
152

    
153
static ssize_t
154
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
155
{
156
    return preadv(fd, iov, nr_iov, offset);
157
}
158

    
159
static ssize_t
160
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
161
{
162
    return pwritev(fd, iov, nr_iov, offset);
163
}
164

    
165
#else
166

    
167
static ssize_t
168
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
169
{
170
    return -ENOSYS;
171
}
172

    
173
static ssize_t
174
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
175
{
176
    return -ENOSYS;
177
}
178

    
179
#endif
180

    
181
static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
182
{
183
    size_t offset = 0;
184
    ssize_t len;
185

    
186
    do {
187
        if (aiocb->aio_type & QEMU_AIO_WRITE)
188
            len = qemu_pwritev(aiocb->aio_fildes,
189
                               aiocb->aio_iov,
190
                               aiocb->aio_niov,
191
                               aiocb->aio_offset + offset);
192
         else
193
            len = qemu_preadv(aiocb->aio_fildes,
194
                              aiocb->aio_iov,
195
                              aiocb->aio_niov,
196
                              aiocb->aio_offset + offset);
197
    } while (len == -1 && errno == EINTR);
198

    
199
    if (len == -1)
200
        return -errno;
201
    return len;
202
}
203

    
204
static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
205
{
206
    ssize_t offset = 0;
207
    ssize_t len;
208

    
209
    while (offset < aiocb->aio_nbytes) {
210
         if (aiocb->aio_type & QEMU_AIO_WRITE)
211
             len = pwrite(aiocb->aio_fildes,
212
                          (const char *)buf + offset,
213
                          aiocb->aio_nbytes - offset,
214
                          aiocb->aio_offset + offset);
215
         else
216
             len = pread(aiocb->aio_fildes,
217
                         buf + offset,
218
                         aiocb->aio_nbytes - offset,
219
                         aiocb->aio_offset + offset);
220

    
221
         if (len == -1 && errno == EINTR)
222
             continue;
223
         else if (len == -1) {
224
             offset = -errno;
225
             break;
226
         } else if (len == 0)
227
             break;
228

    
229
         offset += len;
230
    }
231

    
232
    return offset;
233
}
234

    
235
static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
236
{
237
    ssize_t nbytes;
238
    char *buf;
239

    
240
    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
241
        /*
242
         * If there is just a single buffer, and it is properly aligned
243
         * we can just use plain pread/pwrite without any problems.
244
         */
245
        if (aiocb->aio_niov == 1)
246
             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
247

    
248
        /*
249
         * We have more than one iovec, and all are properly aligned.
250
         *
251
         * Try preadv/pwritev first and fall back to linearizing the
252
         * buffer if it's not supported.
253
         */
254
        if (preadv_present) {
255
            nbytes = handle_aiocb_rw_vector(aiocb);
256
            if (nbytes == aiocb->aio_nbytes)
257
                return nbytes;
258
            if (nbytes < 0 && nbytes != -ENOSYS)
259
                return nbytes;
260
            preadv_present = 0;
261
        }
262

    
263
        /*
264
         * XXX(hch): short read/write.  no easy way to handle the reminder
265
         * using these interfaces.  For now retry using plain
266
         * pread/pwrite?
267
         */
268
    }
269

    
270
    /*
271
     * Ok, we have to do it the hard way, copy all segments into
272
     * a single aligned buffer.
273
     */
274
    buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
275
    if (aiocb->aio_type & QEMU_AIO_WRITE) {
276
        char *p = buf;
277
        int i;
278

    
279
        for (i = 0; i < aiocb->aio_niov; ++i) {
280
            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
281
            p += aiocb->aio_iov[i].iov_len;
282
        }
283
    }
284

    
285
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
286
    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
287
        char *p = buf;
288
        size_t count = aiocb->aio_nbytes, copy;
289
        int i;
290

    
291
        for (i = 0; i < aiocb->aio_niov && count; ++i) {
292
            copy = count;
293
            if (copy > aiocb->aio_iov[i].iov_len)
294
                copy = aiocb->aio_iov[i].iov_len;
295
            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
296
            p     += copy;
297
            count -= copy;
298
        }
299
    }
300
    qemu_vfree(buf);
301

    
302
    return nbytes;
303
}
304

    
305
static void *aio_thread(void *unused)
306
{
307
    pid_t pid;
308

    
309
    pid = getpid();
310

    
311
    while (1) {
312
        struct qemu_paiocb *aiocb;
313
        ssize_t ret = 0;
314
        qemu_timeval tv;
315
        struct timespec ts;
316

    
317
        qemu_gettimeofday(&tv);
318
        ts.tv_sec = tv.tv_sec + 10;
319
        ts.tv_nsec = 0;
320

    
321
        mutex_lock(&lock);
322

    
323
        while (QTAILQ_EMPTY(&request_list) &&
324
               !(ret == ETIMEDOUT)) {
325
            idle_threads++;
326
            ret = cond_timedwait(&cond, &lock, &ts);
327
            idle_threads--;
328
        }
329

    
330
        if (QTAILQ_EMPTY(&request_list))
331
            break;
332

    
333
        aiocb = QTAILQ_FIRST(&request_list);
334
        QTAILQ_REMOVE(&request_list, aiocb, node);
335
        aiocb->active = 1;
336
        mutex_unlock(&lock);
337

    
338
        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
339
        case QEMU_AIO_READ:
340
        case QEMU_AIO_WRITE:
341
            ret = handle_aiocb_rw(aiocb);
342
            break;
343
        case QEMU_AIO_FLUSH:
344
            ret = handle_aiocb_flush(aiocb);
345
            break;
346
        case QEMU_AIO_IOCTL:
347
            ret = handle_aiocb_ioctl(aiocb);
348
            break;
349
        default:
350
            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
351
            ret = -EINVAL;
352
            break;
353
        }
354

    
355
        mutex_lock(&lock);
356
        aiocb->ret = ret;
357
        mutex_unlock(&lock);
358

    
359
        if (kill(pid, aiocb->ev_signo)) die("kill failed");
360
    }
361

    
362
    cur_threads--;
363
    mutex_unlock(&lock);
364

    
365
    return NULL;
366
}
367

    
368
static void spawn_thread(void)
369
{
370
    sigset_t set, oldset;
371

    
372
    cur_threads++;
373

    
374
    /* block all signals */
375
    if (sigfillset(&set)) die("sigfillset");
376
    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
377

    
378
    thread_create(&thread_id, &attr, aio_thread, NULL);
379

    
380
    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
381
}
382

    
383
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
384
{
385
    aiocb->ret = -EINPROGRESS;
386
    aiocb->active = 0;
387
    mutex_lock(&lock);
388
    if (idle_threads == 0 && cur_threads < max_threads)
389
        spawn_thread();
390
    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
391
    mutex_unlock(&lock);
392
    cond_signal(&cond);
393
}
394

    
395
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
396
{
397
    ssize_t ret;
398

    
399
    mutex_lock(&lock);
400
    ret = aiocb->ret;
401
    mutex_unlock(&lock);
402

    
403
    return ret;
404
}
405

    
406
static int qemu_paio_error(struct qemu_paiocb *aiocb)
407
{
408
    ssize_t ret = qemu_paio_return(aiocb);
409

    
410
    if (ret < 0)
411
        ret = -ret;
412
    else
413
        ret = 0;
414

    
415
    return ret;
416
}
417

    
418
static int posix_aio_process_queue(void *opaque)
419
{
420
    PosixAioState *s = opaque;
421
    struct qemu_paiocb *acb, **pacb;
422
    int ret;
423
    int result = 0;
424
    int async_context_id = get_async_context_id();
425

    
426
    for(;;) {
427
        pacb = &s->first_aio;
428
        for(;;) {
429
            acb = *pacb;
430
            if (!acb)
431
                return result;
432

    
433
            /* we're only interested in requests in the right context */
434
            if (acb->async_context_id != async_context_id) {
435
                pacb = &acb->next;
436
                continue;
437
            }
438

    
439
            ret = qemu_paio_error(acb);
440
            if (ret == ECANCELED) {
441
                /* remove the request */
442
                *pacb = acb->next;
443
                qemu_aio_release(acb);
444
                result = 1;
445
            } else if (ret != EINPROGRESS) {
446
                /* end of aio */
447
                if (ret == 0) {
448
                    ret = qemu_paio_return(acb);
449
                    if (ret == acb->aio_nbytes)
450
                        ret = 0;
451
                    else
452
                        ret = -EINVAL;
453
                } else {
454
                    ret = -ret;
455
                }
456

    
457
                trace_paio_complete(acb, acb->common.opaque, ret);
458

    
459
                /* remove the request */
460
                *pacb = acb->next;
461
                /* call the callback */
462
                acb->common.cb(acb->common.opaque, ret);
463
                qemu_aio_release(acb);
464
                result = 1;
465
                break;
466
            } else {
467
                pacb = &acb->next;
468
            }
469
        }
470
    }
471

    
472
    return result;
473
}
474

    
475
static void posix_aio_read(void *opaque)
476
{
477
    PosixAioState *s = opaque;
478
    ssize_t len;
479

    
480
    /* read all bytes from signal pipe */
481
    for (;;) {
482
        char bytes[16];
483

    
484
        len = read(s->rfd, bytes, sizeof(bytes));
485
        if (len == -1 && errno == EINTR)
486
            continue; /* try again */
487
        if (len == sizeof(bytes))
488
            continue; /* more to read */
489
        break;
490
    }
491

    
492
    posix_aio_process_queue(s);
493
}
494

    
495
static int posix_aio_flush(void *opaque)
496
{
497
    PosixAioState *s = opaque;
498
    return !!s->first_aio;
499
}
500

    
501
static PosixAioState *posix_aio_state;
502

    
503
static void aio_signal_handler(int signum)
504
{
505
    if (posix_aio_state) {
506
        char byte = 0;
507
        ssize_t ret;
508

    
509
        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
510
        if (ret < 0 && errno != EAGAIN)
511
            die("write()");
512
    }
513

    
514
    qemu_service_io();
515
}
516

    
517
static void paio_remove(struct qemu_paiocb *acb)
518
{
519
    struct qemu_paiocb **pacb;
520

    
521
    /* remove the callback from the queue */
522
    pacb = &posix_aio_state->first_aio;
523
    for(;;) {
524
        if (*pacb == NULL) {
525
            fprintf(stderr, "paio_remove: aio request not found!\n");
526
            break;
527
        } else if (*pacb == acb) {
528
            *pacb = acb->next;
529
            qemu_aio_release(acb);
530
            break;
531
        }
532
        pacb = &(*pacb)->next;
533
    }
534
}
535

    
536
static void paio_cancel(BlockDriverAIOCB *blockacb)
537
{
538
    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
539
    int active = 0;
540

    
541
    trace_paio_cancel(acb, acb->common.opaque);
542

    
543
    mutex_lock(&lock);
544
    if (!acb->active) {
545
        QTAILQ_REMOVE(&request_list, acb, node);
546
        acb->ret = -ECANCELED;
547
    } else if (acb->ret == -EINPROGRESS) {
548
        active = 1;
549
    }
550
    mutex_unlock(&lock);
551

    
552
    if (active) {
553
        /* fail safe: if the aio could not be canceled, we wait for
554
           it */
555
        while (qemu_paio_error(acb) == EINPROGRESS)
556
            ;
557
    }
558

    
559
    paio_remove(acb);
560
}
561

    
562
static AIOPool raw_aio_pool = {
563
    .aiocb_size         = sizeof(struct qemu_paiocb),
564
    .cancel             = paio_cancel,
565
};
566

    
567
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
568
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
569
        BlockDriverCompletionFunc *cb, void *opaque, int type)
570
{
571
    struct qemu_paiocb *acb;
572

    
573
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
574
    if (!acb)
575
        return NULL;
576
    acb->aio_type = type;
577
    acb->aio_fildes = fd;
578
    acb->ev_signo = SIGUSR2;
579
    acb->async_context_id = get_async_context_id();
580

    
581
    if (qiov) {
582
        acb->aio_iov = qiov->iov;
583
        acb->aio_niov = qiov->niov;
584
    }
585
    acb->aio_nbytes = nb_sectors * 512;
586
    acb->aio_offset = sector_num * 512;
587

    
588
    acb->next = posix_aio_state->first_aio;
589
    posix_aio_state->first_aio = acb;
590

    
591
    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
592
    qemu_paio_submit(acb);
593
    return &acb->common;
594
}
595

    
596
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
597
        unsigned long int req, void *buf,
598
        BlockDriverCompletionFunc *cb, void *opaque)
599
{
600
    struct qemu_paiocb *acb;
601

    
602
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
603
    if (!acb)
604
        return NULL;
605
    acb->aio_type = QEMU_AIO_IOCTL;
606
    acb->aio_fildes = fd;
607
    acb->ev_signo = SIGUSR2;
608
    acb->async_context_id = get_async_context_id();
609
    acb->aio_offset = 0;
610
    acb->aio_ioctl_buf = buf;
611
    acb->aio_ioctl_cmd = req;
612

    
613
    acb->next = posix_aio_state->first_aio;
614
    posix_aio_state->first_aio = acb;
615

    
616
    qemu_paio_submit(acb);
617
    return &acb->common;
618
}
619

    
620
int paio_init(void)
621
{
622
    struct sigaction act;
623
    PosixAioState *s;
624
    int fds[2];
625
    int ret;
626

    
627
    if (posix_aio_state)
628
        return 0;
629

    
630
    s = qemu_malloc(sizeof(PosixAioState));
631

    
632
    sigfillset(&act.sa_mask);
633
    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
634
    act.sa_handler = aio_signal_handler;
635
    sigaction(SIGUSR2, &act, NULL);
636

    
637
    s->first_aio = NULL;
638
    if (qemu_pipe(fds) == -1) {
639
        fprintf(stderr, "failed to create pipe\n");
640
        return -1;
641
    }
642

    
643
    s->rfd = fds[0];
644
    s->wfd = fds[1];
645

    
646
    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
647
    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
648

    
649
    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
650
        posix_aio_process_queue, s);
651

    
652
    ret = pthread_attr_init(&attr);
653
    if (ret)
654
        die2(ret, "pthread_attr_init");
655

    
656
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
657
    if (ret)
658
        die2(ret, "pthread_attr_setdetachstate");
659

    
660
    QTAILQ_INIT(&request_list);
661

    
662
    posix_aio_state = s;
663
    return 0;
664
}