Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ 70fc55eb

History | View | Annotate | Download (15.8 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <sys/ioctl.h>
15
#include <sys/types.h>
16
#include <pthread.h>
17
#include <unistd.h>
18
#include <errno.h>
19
#include <time.h>
20
#include <signal.h>
21
#include <string.h>
22
#include <stdlib.h>
23
#include <stdio.h>
24

    
25
#include "qemu-queue.h"
26
#include "osdep.h"
27
#include "qemu-common.h"
28
#include "trace.h"
29
#include "block_int.h"
30

    
31
#include "block/raw-posix-aio.h"
32

    
33

    
34
struct qemu_paiocb {
35
    BlockDriverAIOCB common;
36
    int aio_fildes;
37
    union {
38
        struct iovec *aio_iov;
39
        void *aio_ioctl_buf;
40
    };
41
    int aio_niov;
42
    size_t aio_nbytes;
43
#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
44
    int ev_signo;
45
    off_t aio_offset;
46

    
47
    QTAILQ_ENTRY(qemu_paiocb) node;
48
    int aio_type;
49
    ssize_t ret;
50
    int active;
51
    struct qemu_paiocb *next;
52

    
53
    int async_context_id;
54
};
55

    
56
typedef struct PosixAioState {
57
    int rfd, wfd;
58
    struct qemu_paiocb *first_aio;
59
} PosixAioState;
60

    
61

    
62
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
63
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
64
static pthread_t thread_id;
65
static pthread_attr_t attr;
66
static int max_threads = 64;
67
static int cur_threads = 0;
68
static int idle_threads = 0;
69
static QTAILQ_HEAD(, qemu_paiocb) request_list;
70

    
71
#ifdef CONFIG_PREADV
72
static int preadv_present = 1;
73
#else
74
static int preadv_present = 0;
75
#endif
76

    
77
static void die2(int err, const char *what)
78
{
79
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
80
    abort();
81
}
82

    
83
static void die(const char *what)
84
{
85
    die2(errno, what);
86
}
87

    
88
static void mutex_lock(pthread_mutex_t *mutex)
89
{
90
    int ret = pthread_mutex_lock(mutex);
91
    if (ret) die2(ret, "pthread_mutex_lock");
92
}
93

    
94
static void mutex_unlock(pthread_mutex_t *mutex)
95
{
96
    int ret = pthread_mutex_unlock(mutex);
97
    if (ret) die2(ret, "pthread_mutex_unlock");
98
}
99

    
100
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
101
                           struct timespec *ts)
102
{
103
    int ret = pthread_cond_timedwait(cond, mutex, ts);
104
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
105
    return ret;
106
}
107

    
108
static void cond_signal(pthread_cond_t *cond)
109
{
110
    int ret = pthread_cond_signal(cond);
111
    if (ret) die2(ret, "pthread_cond_signal");
112
}
113

    
114
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
115
                          void *(*start_routine)(void*), void *arg)
116
{
117
    int ret = pthread_create(thread, attr, start_routine, arg);
118
    if (ret) die2(ret, "pthread_create");
119
}
120

    
121
static ssize_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
122
{
123
    int ret;
124

    
125
    ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
126
    if (ret == -1)
127
        return -errno;
128

    
129
    /*
130
     * This looks weird, but the aio code only consideres a request
131
     * successful if it has written the number full number of bytes.
132
     *
133
     * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
134
     * so in fact we return the ioctl command here to make posix_aio_read()
135
     * happy..
136
     */
137
    return aiocb->aio_nbytes;
138
}
139

    
140
static ssize_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
141
{
142
    int ret;
143

    
144
    ret = qemu_fdatasync(aiocb->aio_fildes);
145
    if (ret == -1)
146
        return -errno;
147
    return 0;
148
}
149

    
150
#ifdef CONFIG_PREADV
151

    
152
static ssize_t
153
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
154
{
155
    return preadv(fd, iov, nr_iov, offset);
156
}
157

    
158
static ssize_t
159
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
160
{
161
    return pwritev(fd, iov, nr_iov, offset);
162
}
163

    
164
#else
165

    
166
static ssize_t
167
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
168
{
169
    return -ENOSYS;
170
}
171

    
172
static ssize_t
173
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
174
{
175
    return -ENOSYS;
176
}
177

    
178
#endif
179

    
180
static ssize_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
181
{
182
    size_t offset = 0;
183
    ssize_t len;
184

    
185
    do {
186
        if (aiocb->aio_type & QEMU_AIO_WRITE)
187
            len = qemu_pwritev(aiocb->aio_fildes,
188
                               aiocb->aio_iov,
189
                               aiocb->aio_niov,
190
                               aiocb->aio_offset + offset);
191
         else
192
            len = qemu_preadv(aiocb->aio_fildes,
193
                              aiocb->aio_iov,
194
                              aiocb->aio_niov,
195
                              aiocb->aio_offset + offset);
196
    } while (len == -1 && errno == EINTR);
197

    
198
    if (len == -1)
199
        return -errno;
200
    return len;
201
}
202

    
203
static ssize_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
204
{
205
    ssize_t offset = 0;
206
    ssize_t len;
207

    
208
    while (offset < aiocb->aio_nbytes) {
209
         if (aiocb->aio_type & QEMU_AIO_WRITE)
210
             len = pwrite(aiocb->aio_fildes,
211
                          (const char *)buf + offset,
212
                          aiocb->aio_nbytes - offset,
213
                          aiocb->aio_offset + offset);
214
         else
215
             len = pread(aiocb->aio_fildes,
216
                         buf + offset,
217
                         aiocb->aio_nbytes - offset,
218
                         aiocb->aio_offset + offset);
219

    
220
         if (len == -1 && errno == EINTR)
221
             continue;
222
         else if (len == -1) {
223
             offset = -errno;
224
             break;
225
         } else if (len == 0)
226
             break;
227

    
228
         offset += len;
229
    }
230

    
231
    return offset;
232
}
233

    
234
static ssize_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
235
{
236
    ssize_t nbytes;
237
    char *buf;
238

    
239
    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
240
        /*
241
         * If there is just a single buffer, and it is properly aligned
242
         * we can just use plain pread/pwrite without any problems.
243
         */
244
        if (aiocb->aio_niov == 1)
245
             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
246

    
247
        /*
248
         * We have more than one iovec, and all are properly aligned.
249
         *
250
         * Try preadv/pwritev first and fall back to linearizing the
251
         * buffer if it's not supported.
252
         */
253
        if (preadv_present) {
254
            nbytes = handle_aiocb_rw_vector(aiocb);
255
            if (nbytes == aiocb->aio_nbytes)
256
                return nbytes;
257
            if (nbytes < 0 && nbytes != -ENOSYS)
258
                return nbytes;
259
            preadv_present = 0;
260
        }
261

    
262
        /*
263
         * XXX(hch): short read/write.  no easy way to handle the reminder
264
         * using these interfaces.  For now retry using plain
265
         * pread/pwrite?
266
         */
267
    }
268

    
269
    /*
270
     * Ok, we have to do it the hard way, copy all segments into
271
     * a single aligned buffer.
272
     */
273
    buf = qemu_blockalign(aiocb->common.bs, aiocb->aio_nbytes);
274
    if (aiocb->aio_type & QEMU_AIO_WRITE) {
275
        char *p = buf;
276
        int i;
277

    
278
        for (i = 0; i < aiocb->aio_niov; ++i) {
279
            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
280
            p += aiocb->aio_iov[i].iov_len;
281
        }
282
    }
283

    
284
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
285
    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
286
        char *p = buf;
287
        size_t count = aiocb->aio_nbytes, copy;
288
        int i;
289

    
290
        for (i = 0; i < aiocb->aio_niov && count; ++i) {
291
            copy = count;
292
            if (copy > aiocb->aio_iov[i].iov_len)
293
                copy = aiocb->aio_iov[i].iov_len;
294
            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
295
            p     += copy;
296
            count -= copy;
297
        }
298
    }
299
    qemu_vfree(buf);
300

    
301
    return nbytes;
302
}
303

    
304
static void *aio_thread(void *unused)
305
{
306
    pid_t pid;
307

    
308
    pid = getpid();
309

    
310
    while (1) {
311
        struct qemu_paiocb *aiocb;
312
        ssize_t ret = 0;
313
        qemu_timeval tv;
314
        struct timespec ts;
315

    
316
        qemu_gettimeofday(&tv);
317
        ts.tv_sec = tv.tv_sec + 10;
318
        ts.tv_nsec = 0;
319

    
320
        mutex_lock(&lock);
321

    
322
        while (QTAILQ_EMPTY(&request_list) &&
323
               !(ret == ETIMEDOUT)) {
324
            ret = cond_timedwait(&cond, &lock, &ts);
325
        }
326

    
327
        if (QTAILQ_EMPTY(&request_list))
328
            break;
329

    
330
        aiocb = QTAILQ_FIRST(&request_list);
331
        QTAILQ_REMOVE(&request_list, aiocb, node);
332
        aiocb->active = 1;
333
        idle_threads--;
334
        mutex_unlock(&lock);
335

    
336
        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
337
        case QEMU_AIO_READ:
338
        case QEMU_AIO_WRITE:
339
            ret = handle_aiocb_rw(aiocb);
340
            break;
341
        case QEMU_AIO_FLUSH:
342
            ret = handle_aiocb_flush(aiocb);
343
            break;
344
        case QEMU_AIO_IOCTL:
345
            ret = handle_aiocb_ioctl(aiocb);
346
            break;
347
        default:
348
            fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
349
            ret = -EINVAL;
350
            break;
351
        }
352

    
353
        mutex_lock(&lock);
354
        aiocb->ret = ret;
355
        idle_threads++;
356
        mutex_unlock(&lock);
357

    
358
        if (kill(pid, aiocb->ev_signo)) die("kill failed");
359
    }
360

    
361
    idle_threads--;
362
    cur_threads--;
363
    mutex_unlock(&lock);
364

    
365
    return NULL;
366
}
367

    
368
static void spawn_thread(void)
369
{
370
    sigset_t set, oldset;
371

    
372
    cur_threads++;
373
    idle_threads++;
374

    
375
    /* block all signals */
376
    if (sigfillset(&set)) die("sigfillset");
377
    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
378

    
379
    thread_create(&thread_id, &attr, aio_thread, NULL);
380

    
381
    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
382
}
383

    
384
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
385
{
386
    aiocb->ret = -EINPROGRESS;
387
    aiocb->active = 0;
388
    mutex_lock(&lock);
389
    if (idle_threads == 0 && cur_threads < max_threads)
390
        spawn_thread();
391
    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
392
    mutex_unlock(&lock);
393
    cond_signal(&cond);
394
}
395

    
396
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
397
{
398
    ssize_t ret;
399

    
400
    mutex_lock(&lock);
401
    ret = aiocb->ret;
402
    mutex_unlock(&lock);
403

    
404
    return ret;
405
}
406

    
407
static int qemu_paio_error(struct qemu_paiocb *aiocb)
408
{
409
    ssize_t ret = qemu_paio_return(aiocb);
410

    
411
    if (ret < 0)
412
        ret = -ret;
413
    else
414
        ret = 0;
415

    
416
    return ret;
417
}
418

    
419
static int posix_aio_process_queue(void *opaque)
420
{
421
    PosixAioState *s = opaque;
422
    struct qemu_paiocb *acb, **pacb;
423
    int ret;
424
    int result = 0;
425
    int async_context_id = get_async_context_id();
426

    
427
    for(;;) {
428
        pacb = &s->first_aio;
429
        for(;;) {
430
            acb = *pacb;
431
            if (!acb)
432
                return result;
433

    
434
            /* we're only interested in requests in the right context */
435
            if (acb->async_context_id != async_context_id) {
436
                pacb = &acb->next;
437
                continue;
438
            }
439

    
440
            ret = qemu_paio_error(acb);
441
            if (ret == ECANCELED) {
442
                /* remove the request */
443
                *pacb = acb->next;
444
                qemu_aio_release(acb);
445
                result = 1;
446
            } else if (ret != EINPROGRESS) {
447
                /* end of aio */
448
                if (ret == 0) {
449
                    ret = qemu_paio_return(acb);
450
                    if (ret == acb->aio_nbytes)
451
                        ret = 0;
452
                    else
453
                        ret = -EINVAL;
454
                } else {
455
                    ret = -ret;
456
                }
457
                /* remove the request */
458
                *pacb = acb->next;
459
                /* call the callback */
460
                acb->common.cb(acb->common.opaque, ret);
461
                qemu_aio_release(acb);
462
                result = 1;
463
                break;
464
            } else {
465
                pacb = &acb->next;
466
            }
467
        }
468
    }
469

    
470
    return result;
471
}
472

    
473
static void posix_aio_read(void *opaque)
474
{
475
    PosixAioState *s = opaque;
476
    ssize_t len;
477

    
478
    /* read all bytes from signal pipe */
479
    for (;;) {
480
        char bytes[16];
481

    
482
        len = read(s->rfd, bytes, sizeof(bytes));
483
        if (len == -1 && errno == EINTR)
484
            continue; /* try again */
485
        if (len == sizeof(bytes))
486
            continue; /* more to read */
487
        break;
488
    }
489

    
490
    posix_aio_process_queue(s);
491
}
492

    
493
static int posix_aio_flush(void *opaque)
494
{
495
    PosixAioState *s = opaque;
496
    return !!s->first_aio;
497
}
498

    
499
static PosixAioState *posix_aio_state;
500

    
501
static void aio_signal_handler(int signum)
502
{
503
    if (posix_aio_state) {
504
        char byte = 0;
505
        ssize_t ret;
506

    
507
        ret = write(posix_aio_state->wfd, &byte, sizeof(byte));
508
        if (ret < 0 && errno != EAGAIN)
509
            die("write()");
510
    }
511

    
512
    qemu_service_io();
513
}
514

    
515
static void paio_remove(struct qemu_paiocb *acb)
516
{
517
    struct qemu_paiocb **pacb;
518

    
519
    /* remove the callback from the queue */
520
    pacb = &posix_aio_state->first_aio;
521
    for(;;) {
522
        if (*pacb == NULL) {
523
            fprintf(stderr, "paio_remove: aio request not found!\n");
524
            break;
525
        } else if (*pacb == acb) {
526
            *pacb = acb->next;
527
            qemu_aio_release(acb);
528
            break;
529
        }
530
        pacb = &(*pacb)->next;
531
    }
532
}
533

    
534
static void paio_cancel(BlockDriverAIOCB *blockacb)
535
{
536
    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
537
    int active = 0;
538

    
539
    mutex_lock(&lock);
540
    if (!acb->active) {
541
        QTAILQ_REMOVE(&request_list, acb, node);
542
        acb->ret = -ECANCELED;
543
    } else if (acb->ret == -EINPROGRESS) {
544
        active = 1;
545
    }
546
    mutex_unlock(&lock);
547

    
548
    if (active) {
549
        /* fail safe: if the aio could not be canceled, we wait for
550
           it */
551
        while (qemu_paio_error(acb) == EINPROGRESS)
552
            ;
553
    }
554

    
555
    paio_remove(acb);
556
}
557

    
558
static AIOPool raw_aio_pool = {
559
    .aiocb_size         = sizeof(struct qemu_paiocb),
560
    .cancel             = paio_cancel,
561
};
562

    
563
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, int fd,
564
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
565
        BlockDriverCompletionFunc *cb, void *opaque, int type)
566
{
567
    struct qemu_paiocb *acb;
568

    
569
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
570
    if (!acb)
571
        return NULL;
572
    acb->aio_type = type;
573
    acb->aio_fildes = fd;
574
    acb->ev_signo = SIGUSR2;
575
    acb->async_context_id = get_async_context_id();
576

    
577
    if (qiov) {
578
        acb->aio_iov = qiov->iov;
579
        acb->aio_niov = qiov->niov;
580
    }
581
    acb->aio_nbytes = nb_sectors * 512;
582
    acb->aio_offset = sector_num * 512;
583

    
584
    acb->next = posix_aio_state->first_aio;
585
    posix_aio_state->first_aio = acb;
586

    
587
    trace_paio_submit(acb, opaque, sector_num, nb_sectors, type);
588
    qemu_paio_submit(acb);
589
    return &acb->common;
590
}
591

    
592
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
593
        unsigned long int req, void *buf,
594
        BlockDriverCompletionFunc *cb, void *opaque)
595
{
596
    struct qemu_paiocb *acb;
597

    
598
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
599
    if (!acb)
600
        return NULL;
601
    acb->aio_type = QEMU_AIO_IOCTL;
602
    acb->aio_fildes = fd;
603
    acb->ev_signo = SIGUSR2;
604
    acb->async_context_id = get_async_context_id();
605
    acb->aio_offset = 0;
606
    acb->aio_ioctl_buf = buf;
607
    acb->aio_ioctl_cmd = req;
608

    
609
    acb->next = posix_aio_state->first_aio;
610
    posix_aio_state->first_aio = acb;
611

    
612
    qemu_paio_submit(acb);
613
    return &acb->common;
614
}
615

    
616
int paio_init(void)
617
{
618
    struct sigaction act;
619
    PosixAioState *s;
620
    int fds[2];
621
    int ret;
622

    
623
    if (posix_aio_state)
624
        return 0;
625

    
626
    s = qemu_malloc(sizeof(PosixAioState));
627

    
628
    sigfillset(&act.sa_mask);
629
    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
630
    act.sa_handler = aio_signal_handler;
631
    sigaction(SIGUSR2, &act, NULL);
632

    
633
    s->first_aio = NULL;
634
    if (qemu_pipe(fds) == -1) {
635
        fprintf(stderr, "failed to create pipe\n");
636
        return -1;
637
    }
638

    
639
    s->rfd = fds[0];
640
    s->wfd = fds[1];
641

    
642
    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
643
    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
644

    
645
    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
646
        posix_aio_process_queue, s);
647

    
648
    ret = pthread_attr_init(&attr);
649
    if (ret)
650
        die2(ret, "pthread_attr_init");
651

    
652
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
653
    if (ret)
654
        die2(ret, "pthread_attr_setdetachstate");
655

    
656
    QTAILQ_INIT(&request_list);
657

    
658
    posix_aio_state = s;
659
    return 0;
660
}