Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ 5844997a

History | View | Annotate | Download (15.9 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <sys/ioctl.h>
15
#include <sys/types.h>
16
#include <pthread.h>
17
#include <unistd.h>
18
#include <errno.h>
19
#include <time.h>
20
#include <signal.h>
21
#include <string.h>
22
#include <stdlib.h>
23
#include <stdio.h>
24

    
25
#include "qemu-queue.h"
26
#include "osdep.h"
27
#include "sysemu.h"
28
#include "qemu-common.h"
29
#include "trace.h"
30
#include "block_int.h"
31

    
32
#include "block/raw-posix-aio.h"
33

    
34

    
35
struct qemu_paiocb {
36
    BlockDriverAIOCB common;
37
    int aio_fildes;
38
    union {
39
        struct iovec *aio_iov;
40
        void *aio_ioctl_buf;
41
    };
42
    int aio_niov;
43
    size_t aio_nbytes;
44
#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
45
    int ev_signo;
46
    off_t aio_offset;
47

    
48
    QTAILQ_ENTRY(qemu_paiocb) node;
49
    int aio_type;
50
    ssize_t ret;
51
    int active;
52
    struct qemu_paiocb *next;
53

    
54
    int async_context_id;
55
};
56

    
57
typedef struct PosixAioState {
58
    int rfd, wfd;
59
    struct qemu_paiocb *first_aio;
60
} PosixAioState;
61

    
62

    
63
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
64
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
65
static pthread_t thread_id;
66
static pthread_attr_t attr;
67
static int max_threads = 64;
68
static int cur_threads = 0;
69
static int idle_threads = 0;
70
static QTAILQ_HEAD(, qemu_paiocb) request_list;
71

    
72
#ifdef CONFIG_PREADV
73
static int preadv_present = 1;
74
#else
75
static int preadv_present = 0;
76
#endif
77

    
78
static void die2(int err, const char *what)
79
{
80
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
81
    abort();
82
}
83

    
84
static void die(const char *what)
85
{
86
    die2(errno, what);
87
}
88

    
89
static void mutex_lock(pthread_mutex_t *mutex)
90
{
91
    int ret = pthread_mutex_lock(mutex);
92
    if (ret) die2(ret, "pthread_mutex_lock");
93
}
94

    
95
static void mutex_unlock(pthread_mutex_t *mutex)
96
{
97
    int ret = pthread_mutex_unlock(mutex);
98
    if (ret) die2(ret, "pthread_mutex_unlock");
99
}
100

    
101
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
102
                           struct timespec *ts)
103
{
104
    int ret = pthread_cond_timedwait(cond, mutex, ts);
105
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
106
    return ret;
107
}
108

    
109
static void cond_signal(pthread_cond_t *cond)
110
{
111
    int ret = pthread_cond_signal(cond);
112
    if (ret) die2(ret, "pthread_cond_signal");
113
}
114

    
115
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
116
                          void *(*start_routine)(void*), void *arg)
117
{
118
    int ret = pthread_create(thread, attr, start_routine, arg);
119
    if (ret) die2(ret, "pthread_create");
120
}
121

    
122
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
123
{
124
    int ret;
125

    
126
    ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
127
    if (ret == -1)
128
        return -errno;
129

    
130
    /*
131
     * This looks weird, but the aio code only consideres a request
132
     * successful if it has written the number full number of bytes.
133
     *
134
     * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
135
     * so in fact we return the ioctl command here to make posix_aio_read()
136
     * happy..
137
     */
138
    return aiocb->aio_nbytes;
139
}
140

    
141
static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
142
{
143
    int ret;
144

    
145
    ret = qemu_fdatasync(aiocb->aio_fildes);
146
    if (ret == -1)
147
        return -errno;
148
    return 0;
149
}
150

    
151
#ifdef CONFIG_PREADV
152

    
153
static ssize_t
154
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
155
{
156
    return preadv(fd, iov, nr_iov, offset);
157
}
158

    
159
static ssize_t
160
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
161
{
162
    return pwritev(fd, iov, nr_iov, offset);
163
}
164

    
165
#else
166

    
167
static ssize_t
168
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
169
{
170
    return -ENOSYS;
171
}
172

    
173
static ssize_t
174
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
175
{
176
    return -ENOSYS;
177
}
178

    
179
#endif
180

    
181
static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
182
{
183
    size_t offset = 0;
184
    ssize_t len;
185

    
186
    do {
187
        if (aiocb->aio_type & QEMU_AIO_WRITE)
188
            len = qemu_pwritev(aiocb->aio_fildes,
189
                               aiocb->aio_iov,
190
                               aiocb->aio_niov,
191
                               aiocb->aio_offset + offset);
192
         else
193
            len = qemu_preadv(aiocb->aio_fildes,
194
                              aiocb->aio_iov,
195
                              aiocb->aio_niov,
196
                              aiocb->aio_offset + offset);
197
    } while (len == -1 && errno == EINTR);
198

    
199
    if (len == -1)
200
        return -errno;
201
    return len;
202
}
203

    
204
static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
205
{
206
    ssize_t offset = 0;
207
    ssize_t len;
208

    
209
    while (offset < aiocb->aio_nbytes) {
210
         if (aiocb->aio_type & QEMU_AIO_WRITE)
211
             len = pwrite(aiocb->aio_fildes,
212
                          (const char *)buf + offset,
213
                          aiocb->aio_nbytes - offset,
214
                          aiocb->aio_offset + offset);
215
         else
216
             len = pread(aiocb->aio_fildes,
217
                         buf + offset,
218
                         aiocb->aio_nbytes - offset,
219
                         aiocb->aio_offset + offset);
220

    
221
         if (len == -1 && errno == EINTR)
222
             continue;
223
         else if (len == -1) {
224
             offset = -errno;
225
             break;
226
         } else if (len == 0)
227
             break;
228

    
229
         offset += len;
230
    }
231

    
232
    return offset;
233
}
234

    
235
static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
236
{
237
    ssize_t nbytes;
238
    char *buf;
239

    
240
    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
241
        /*
242
         * If there is just a single buffer, and it is properly aligned
243
         * we can just use plain pread/pwrite without any problems.
244
         */
245
        if (aiocb->aio_niov == 1)
246
             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
247

    
248
        /*
249
         * We have more than one iovec, and all are properly aligned.
250
         *
251
         * Try preadv/pwritev first and fall back to linearizing the
252
         * buffer if it's not supported.
253
         */
254
        if (preadv_present) {
255
            nbytes = handle_aiocb_rw_vector(aiocb);
256
            if (nbytes == aiocb->aio_nbytes)
257
                return nbytes;
258
            if (nbytes < 0 && nbytes != -ENOSYS)
259
                return nbytes;
260
            preadv_present = 0;
261
        }
262

    
263
        /*
264
         * XXX(hch): short read/write.  no easy way to handle the reminder
265
         * using these interfaces.  For now retry using plain
266
         * pread/pwrite?
267
         */
268
    }
269

    
270
    /*
271
     * Ok, we have to do it the hard way, copy all segments into
272
     * a single aligned buffer.
273
     */
274
    buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
275
    if (aiocb->aio_type & QEMU_AIO_WRITE) {
276
        char *p = buf;
277
        int i;
278

    
279
        for (i = 0; i < aiocb->aio_niov; ++i) {
280
            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
281
            p += aiocb->aio_iov[i].iov_len;
282
        }
283
    }
284

    
285
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
286
    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
287
        char *p = buf;
288
        size_t count = aiocb->aio_nbytes, copy;
289
        int i;
290

    
291
        for (i = 0; i < aiocb->aio_niov && count; ++i) {
292
            copy = count;
293
            if (copy > aiocb->aio_iov[i].iov_len)
294
                copy = aiocb->aio_iov[i].iov_len;
295
            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
296
            p     += copy;
297
            count -= copy;
298
        }
299
    }
300
    qemu_vfree(buf);
301

    
302
    return nbytes;
303
}
304

    
305
static void *aio_thread(void *unused)
306
{
307
    pid_t pid;
308

    
309
    pid = getpid();
310

    
311
    while (1) {
312
        struct qemu_paiocb *aiocb;
313
        ssize_t ret = 0;
314
        qemu_timeval tv;
315
        struct timespec ts;
316

    
317
        qemu_gettimeofday(&tv);
318
        ts.tv_sec = tv.tv_sec + 10;
319
        ts.tv_nsec = 0;
320

    
321
        mutex_lock(&lock);
322

    
323
        while (QTAILQ_EMPTY(&request_list) &&
324
               !(ret == ETIMEDOUT)) {
325
            ret = cond_timedwait(&cond, &lock, &ts);
326
        }
327

    
328
        if (QTAILQ_EMPTY(&request_list))
329
            break;
330

    
331
        aiocb = QTAILQ_FIRST(&request_list);
332
        QTAILQ_REMOVE(&request_list, aiocb, node);
333
        aiocb->active = 1;
334
        idle_threads--;
335
        mutex_unlock(&lock);
336

    
337
        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
338
        case QEMU_AIO_READ:
339
        case QEMU_AIO_WRITE:
340
            ret = handle_aiocb_rw(aiocb);
341
            break;
342
        case QEMU_AIO_FLUSH:
343
            ret = handle_aiocb_flush(aiocb);
344
            break;
345
        case QEMU_AIO_IOCTL:
346
            ret = handle_aiocb_ioctl(aiocb);
347
            break;
348
        default:
349
            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
350
            ret = -EINVAL;
351
            break;
352
        }
353

    
354
        mutex_lock(&lock);
355
        aiocb->ret = ret;
356
        idle_threads++;
357
        mutex_unlock(&lock);
358

    
359
        if (kill(pid, aiocb->ev_signo)) die("kill failed");
360
    }
361

    
362
    idle_threads--;
363
    cur_threads--;
364
    mutex_unlock(&lock);
365

    
366
    return NULL;
367
}
368

    
369
static void spawn_thread(void)
370
{
371
    sigset_t set, oldset;
372

    
373
    cur_threads++;
374
    idle_threads++;
375

    
376
    /* block all signals */
377
    if (sigfillset(&set)) die("sigfillset");
378
    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
379

    
380
    thread_create(&thread_id, &attr, aio_thread, NULL);
381

    
382
    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
383
}
384

    
385
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
386
{
387
    aiocb->ret = -EINPROGRESS;
388
    aiocb->active = 0;
389
    mutex_lock(&lock);
390
    if (idle_threads == 0 && cur_threads < max_threads)
391
        spawn_thread();
392
    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
393
    mutex_unlock(&lock);
394
    cond_signal(&cond);
395
}
396

    
397
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
398
{
399
    ssize_t ret;
400

    
401
    mutex_lock(&lock);
402
    ret = aiocb->ret;
403
    mutex_unlock(&lock);
404

    
405
    return ret;
406
}
407

    
408
static int qemu_paio_error(struct qemu_paiocb *aiocb)
409
{
410
    ssize_t ret = qemu_paio_return(aiocb);
411

    
412
    if (ret < 0)
413
        ret = -ret;
414
    else
415
        ret = 0;
416

    
417
    return ret;
418
}
419

    
420
static int posix_aio_process_queue(void *opaque)
421
{
422
    PosixAioState *s = opaque;
423
    struct qemu_paiocb *acb, **pacb;
424
    int ret;
425
    int result = 0;
426
    int async_context_id = get_async_context_id();
427

    
428
    for(;;) {
429
        pacb = &s->first_aio;
430
        for(;;) {
431
            acb = *pacb;
432
            if (!acb)
433
                return result;
434

    
435
            /* we're only interested in requests in the right context */
436
            if (acb->async_context_id != async_context_id) {
437
                pacb = &acb->next;
438
                continue;
439
            }
440

    
441
            ret = qemu_paio_error(acb);
442
            if (ret == ECANCELED) {
443
                /* remove the request */
444
                *pacb = acb->next;
445
                qemu_aio_release(acb);
446
                result = 1;
447
            } else if (ret != EINPROGRESS) {
448
                /* end of aio */
449
                if (ret == 0) {
450
                    ret = qemu_paio_return(acb);
451
                    if (ret == acb->aio_nbytes)
452
                        ret = 0;
453
                    else
454
                        ret = -EINVAL;
455
                } else {
456
                    ret = -ret;
457
                }
458

    
459
                trace_paio_complete(acb, acb->common.opaque, ret);
460

    
461
                /* remove the request */
462
                *pacb = acb->next;
463
                /* call the callback */
464
                acb->common.cb(acb->common.opaque, ret);
465
                qemu_aio_release(acb);
466
                result = 1;
467
                break;
468
            } else {
469
                pacb = &acb->next;
470
            }
471
        }
472
    }
473

    
474
    return result;
475
}
476

    
477
static void posix_aio_read(void *opaque)
478
{
479
    PosixAioState *s = opaque;
480
    ssize_t len;
481

    
482
    /* read all bytes from signal pipe */
483
    for (;;) {
484
        char bytes[16];
485

    
486
        len = read(s->rfd, bytes, sizeof(bytes));
487
        if (len == -1 && errno == EINTR)
488
            continue; /* try again */
489
        if (len == sizeof(bytes))
490
            continue; /* more to read */
491
        break;
492
    }
493

    
494
    posix_aio_process_queue(s);
495
}
496

    
497
static int posix_aio_flush(void *opaque)
498
{
499
    PosixAioState *s = opaque;
500
    return !!s->first_aio;
501
}
502

    
503
static PosixAioState *posix_aio_state;
504

    
505
static void aio_signal_handler(int signum)
506
{
507
    if (posix_aio_state) {
508
        char byte = 0;
509
        ssize_t ret;
510

    
511
        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
512
        if (ret < 0 && errno != EAGAIN)
513
            die("write()");
514
    }
515

    
516
    qemu_service_io();
517
}
518

    
519
static void paio_remove(struct qemu_paiocb *acb)
520
{
521
    struct qemu_paiocb **pacb;
522

    
523
    /* remove the callback from the queue */
524
    pacb = &posix_aio_state->first_aio;
525
    for(;;) {
526
        if (*pacb == NULL) {
527
            fprintf(stderr, "paio_remove: aio request not found!\n");
528
            break;
529
        } else if (*pacb == acb) {
530
            *pacb = acb->next;
531
            qemu_aio_release(acb);
532
            break;
533
        }
534
        pacb = &(*pacb)->next;
535
    }
536
}
537

    
538
static void paio_cancel(BlockDriverAIOCB *blockacb)
539
{
540
    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
541
    int active = 0;
542

    
543
    trace_paio_cancel(acb, acb->common.opaque);
544

    
545
    mutex_lock(&lock);
546
    if (!acb->active) {
547
        QTAILQ_REMOVE(&request_list, acb, node);
548
        acb->ret = -ECANCELED;
549
    } else if (acb->ret == -EINPROGRESS) {
550
        active = 1;
551
    }
552
    mutex_unlock(&lock);
553

    
554
    if (active) {
555
        /* fail safe: if the aio could not be canceled, we wait for
556
           it */
557
        while (qemu_paio_error(acb) == EINPROGRESS)
558
            ;
559
    }
560

    
561
    paio_remove(acb);
562
}
563

    
564
static AIOPool raw_aio_pool = {
565
    .aiocb_size         = sizeof(struct qemu_paiocb),
566
    .cancel             = paio_cancel,
567
};
568

    
569
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
570
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
571
        BlockDriverCompletionFunc *cb, void *opaque, int type)
572
{
573
    struct qemu_paiocb *acb;
574

    
575
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
576
    if (!acb)
577
        return NULL;
578
    acb->aio_type = type;
579
    acb->aio_fildes = fd;
580
    acb->ev_signo = SIGUSR2;
581
    acb->async_context_id = get_async_context_id();
582

    
583
    if (qiov) {
584
        acb->aio_iov = qiov->iov;
585
        acb->aio_niov = qiov->niov;
586
    }
587
    acb->aio_nbytes = nb_sectors * 512;
588
    acb->aio_offset = sector_num * 512;
589

    
590
    acb->next = posix_aio_state->first_aio;
591
    posix_aio_state->first_aio = acb;
592

    
593
    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
594
    qemu_paio_submit(acb);
595
    return &acb->common;
596
}
597

    
598
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
599
        unsigned long int req, void *buf,
600
        BlockDriverCompletionFunc *cb, void *opaque)
601
{
602
    struct qemu_paiocb *acb;
603

    
604
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
605
    if (!acb)
606
        return NULL;
607
    acb->aio_type = QEMU_AIO_IOCTL;
608
    acb->aio_fildes = fd;
609
    acb->ev_signo = SIGUSR2;
610
    acb->async_context_id = get_async_context_id();
611
    acb->aio_offset = 0;
612
    acb->aio_ioctl_buf = buf;
613
    acb->aio_ioctl_cmd = req;
614

    
615
    acb->next = posix_aio_state->first_aio;
616
    posix_aio_state->first_aio = acb;
617

    
618
    qemu_paio_submit(acb);
619
    return &acb->common;
620
}
621

    
622
int paio_init(void)
623
{
624
    struct sigaction act;
625
    PosixAioState *s;
626
    int fds[2];
627
    int ret;
628

    
629
    if (posix_aio_state)
630
        return 0;
631

    
632
    s = qemu_malloc(sizeof(PosixAioState));
633

    
634
    sigfillset(&act.sa_mask);
635
    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
636
    act.sa_handler = aio_signal_handler;
637
    sigaction(SIGUSR2, &act, NULL);
638

    
639
    s->first_aio = NULL;
640
    if (qemu_pipe(fds) == -1) {
641
        fprintf(stderr, "failed to create pipe\n");
642
        return -1;
643
    }
644

    
645
    s->rfd = fds[0];
646
    s->wfd = fds[1];
647

    
648
    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
649
    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
650

    
651
    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
652
        posix_aio_process_queue, s);
653

    
654
    ret = pthread_attr_init(&attr);
655
    if (ret)
656
        die2(ret, "pthread_attr_init");
657

    
658
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
659
    if (ret)
660
        die2(ret, "pthread_attr_setdetachstate");
661

    
662
    QTAILQ_INIT(&request_list);
663

    
664
    posix_aio_state = s;
665
    return 0;
666
}