Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ b53d44e5

History | View | Annotate | Download (14.9 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <sys/ioctl.h>
15
#include <sys/types.h>
16
#include <pthread.h>
17
#include <unistd.h>
18
#include <errno.h>
19
#include <time.h>
20
#include <signal.h>
21
#include <string.h>
22
#include <stdlib.h>
23
#include <stdio.h>
24

    
25
#include "qemu-queue.h"
26
#include "osdep.h"
27
#include "qemu-common.h"
28
#include "block_int.h"
29

    
30
#include "block/raw-posix-aio.h"
31

    
32

    
33
struct qemu_paiocb {
34
    BlockDriverAIOCB common;
35
    int aio_fildes;
36
    union {
37
        struct iovec *aio_iov;
38
        void *aio_ioctl_buf;
39
    };
40
    int aio_niov;
41
    size_t aio_nbytes;
42
#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
43
    int ev_signo;
44
    off_t aio_offset;
45

    
46
    QTAILQ_ENTRY(qemu_paiocb) node;
47
    int aio_type;
48
    ssize_t ret;
49
    int active;
50
    struct qemu_paiocb *next;
51
};
52

    
53
typedef struct PosixAioState {
54
    int rfd, wfd;
55
    struct qemu_paiocb *first_aio;
56
} PosixAioState;
57

    
58

    
59
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
60
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
61
static pthread_t thread_id;
62
static pthread_attr_t attr;
63
static int max_threads = 64;
64
static int cur_threads = 0;
65
static int idle_threads = 0;
66
static QTAILQ_HEAD(, qemu_paiocb) request_list;
67

    
68
#ifdef CONFIG_PREADV
69
static int preadv_present = 1;
70
#else
71
static int preadv_present = 0;
72
#endif
73

    
74
static void die2(int err, const char *what)
75
{
76
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
77
    abort();
78
}
79

    
80
static void die(const char *what)
81
{
82
    die2(errno, what);
83
}
84

    
85
static void mutex_lock(pthread_mutex_t *mutex)
86
{
87
    int ret = pthread_mutex_lock(mutex);
88
    if (ret) die2(ret, "pthread_mutex_lock");
89
}
90

    
91
static void mutex_unlock(pthread_mutex_t *mutex)
92
{
93
    int ret = pthread_mutex_unlock(mutex);
94
    if (ret) die2(ret, "pthread_mutex_unlock");
95
}
96

    
97
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
98
                           struct timespec *ts)
99
{
100
    int ret = pthread_cond_timedwait(cond, mutex, ts);
101
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
102
    return ret;
103
}
104

    
105
static void cond_signal(pthread_cond_t *cond)
106
{
107
    int ret = pthread_cond_signal(cond);
108
    if (ret) die2(ret, "pthread_cond_signal");
109
}
110

    
111
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
112
                          void *(*start_routine)(void*), void *arg)
113
{
114
    int ret = pthread_create(thread, attr, start_routine, arg);
115
    if (ret) die2(ret, "pthread_create");
116
}
117

    
118
static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
119
{
120
        int ret;
121

    
122
        ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
123
        if (ret == -1)
124
                return -errno;
125

    
126
        /*
127
         * This looks weird, but the aio code only consideres a request
128
         * successfull if it has written the number full number of bytes.
129
         *
130
         * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
131
         * so in fact we return the ioctl command here to make posix_aio_read()
132
         * happy..
133
         */
134
        return aiocb->aio_nbytes;
135
}
136

    
137
static size_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
138
{
139
    int ret;
140

    
141
    ret = qemu_fdatasync(aiocb->aio_fildes);
142
    if (ret == -1)
143
        return -errno;
144
    return 0;
145
}
146

    
147
#ifdef CONFIG_PREADV
148

    
149
static ssize_t
150
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
151
{
152
    return preadv(fd, iov, nr_iov, offset);
153
}
154

    
155
static ssize_t
156
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
157
{
158
    return pwritev(fd, iov, nr_iov, offset);
159
}
160

    
161
#else
162

    
163
static ssize_t
164
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
165
{
166
    return -ENOSYS;
167
}
168

    
169
static ssize_t
170
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
171
{
172
    return -ENOSYS;
173
}
174

    
175
#endif
176

    
177
static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
178
{
179
    size_t offset = 0;
180
    ssize_t len;
181

    
182
    do {
183
        if (aiocb->aio_type & QEMU_AIO_WRITE)
184
            len = qemu_pwritev(aiocb->aio_fildes,
185
                               aiocb->aio_iov,
186
                               aiocb->aio_niov,
187
                               aiocb->aio_offset + offset);
188
         else
189
            len = qemu_preadv(aiocb->aio_fildes,
190
                              aiocb->aio_iov,
191
                              aiocb->aio_niov,
192
                              aiocb->aio_offset + offset);
193
    } while (len == -1 && errno == EINTR);
194

    
195
    if (len == -1)
196
        return -errno;
197
    return len;
198
}
199

    
200
static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
201
{
202
    size_t offset = 0;
203
    size_t len;
204

    
205
    while (offset < aiocb->aio_nbytes) {
206
         if (aiocb->aio_type & QEMU_AIO_WRITE)
207
             len = pwrite(aiocb->aio_fildes,
208
                          (const char *)buf + offset,
209
                          aiocb->aio_nbytes - offset,
210
                          aiocb->aio_offset + offset);
211
         else
212
             len = pread(aiocb->aio_fildes,
213
                         buf + offset,
214
                         aiocb->aio_nbytes - offset,
215
                         aiocb->aio_offset + offset);
216

    
217
         if (len == -1 && errno == EINTR)
218
             continue;
219
         else if (len == -1) {
220
             offset = -errno;
221
             break;
222
         } else if (len == 0)
223
             break;
224

    
225
         offset += len;
226
    }
227

    
228
    return offset;
229
}
230

    
231
static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
232
{
233
    size_t nbytes;
234
    char *buf;
235

    
236
    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
237
        /*
238
         * If there is just a single buffer, and it is properly aligned
239
         * we can just use plain pread/pwrite without any problems.
240
         */
241
        if (aiocb->aio_niov == 1)
242
             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
243

    
244
        /*
245
         * We have more than one iovec, and all are properly aligned.
246
         *
247
         * Try preadv/pwritev first and fall back to linearizing the
248
         * buffer if it's not supported.
249
         */
250
        if (preadv_present) {
251
            nbytes = handle_aiocb_rw_vector(aiocb);
252
            if (nbytes == aiocb->aio_nbytes)
253
                return nbytes;
254
            if (nbytes < 0 && nbytes != -ENOSYS)
255
                return nbytes;
256
            preadv_present = 0;
257
        }
258

    
259
        /*
260
         * XXX(hch): short read/write.  no easy way to handle the reminder
261
         * using these interfaces.  For now retry using plain
262
         * pread/pwrite?
263
         */
264
    }
265

    
266
    /*
267
     * Ok, we have to do it the hard way, copy all segments into
268
     * a single aligned buffer.
269
     */
270
    buf = qemu_memalign(512, aiocb->aio_nbytes);
271
    if (aiocb->aio_type & QEMU_AIO_WRITE) {
272
        char *p = buf;
273
        int i;
274

    
275
        for (i = 0; i < aiocb->aio_niov; ++i) {
276
            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
277
            p += aiocb->aio_iov[i].iov_len;
278
        }
279
    }
280

    
281
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
282
    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
283
        char *p = buf;
284
        size_t count = aiocb->aio_nbytes, copy;
285
        int i;
286

    
287
        for (i = 0; i < aiocb->aio_niov && count; ++i) {
288
            copy = count;
289
            if (copy > aiocb->aio_iov[i].iov_len)
290
                copy = aiocb->aio_iov[i].iov_len;
291
            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
292
            p     += copy;
293
            count -= copy;
294
        }
295
    }
296
    qemu_vfree(buf);
297

    
298
    return nbytes;
299
}
300

    
301
static void *aio_thread(void *unused)
302
{
303
    pid_t pid;
304

    
305
    pid = getpid();
306

    
307
    while (1) {
308
        struct qemu_paiocb *aiocb;
309
        size_t ret = 0;
310
        qemu_timeval tv;
311
        struct timespec ts;
312

    
313
        qemu_gettimeofday(&tv);
314
        ts.tv_sec = tv.tv_sec + 10;
315
        ts.tv_nsec = 0;
316

    
317
        mutex_lock(&lock);
318

    
319
        while (QTAILQ_EMPTY(&request_list) &&
320
               !(ret == ETIMEDOUT)) {
321
            ret = cond_timedwait(&cond, &lock, &ts);
322
        }
323

    
324
        if (QTAILQ_EMPTY(&request_list))
325
            break;
326

    
327
        aiocb = QTAILQ_FIRST(&request_list);
328
        QTAILQ_REMOVE(&request_list, aiocb, node);
329
        aiocb->active = 1;
330
        idle_threads--;
331
        mutex_unlock(&lock);
332

    
333
        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
334
        case QEMU_AIO_READ:
335
        case QEMU_AIO_WRITE:
336
                ret = handle_aiocb_rw(aiocb);
337
                break;
338
        case QEMU_AIO_FLUSH:
339
                ret = handle_aiocb_flush(aiocb);
340
                break;
341
        case QEMU_AIO_IOCTL:
342
                ret = handle_aiocb_ioctl(aiocb);
343
                break;
344
        default:
345
                fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
346
                ret = -EINVAL;
347
                break;
348
        }
349

    
350
        mutex_lock(&lock);
351
        aiocb->ret = ret;
352
        idle_threads++;
353
        mutex_unlock(&lock);
354

    
355
        if (kill(pid, aiocb->ev_signo)) die("kill failed");
356
    }
357

    
358
    idle_threads--;
359
    cur_threads--;
360
    mutex_unlock(&lock);
361

    
362
    return NULL;
363
}
364

    
365
static void spawn_thread(void)
366
{
367
    sigset_t set, oldset;
368

    
369
    cur_threads++;
370
    idle_threads++;
371

    
372
    /* block all signals */
373
    if (sigfillset(&set)) die("sigfillset");
374
    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
375

    
376
    thread_create(&thread_id, &attr, aio_thread, NULL);
377

    
378
    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
379
}
380

    
381
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
382
{
383
    aiocb->ret = -EINPROGRESS;
384
    aiocb->active = 0;
385
    mutex_lock(&lock);
386
    if (idle_threads == 0 && cur_threads < max_threads)
387
        spawn_thread();
388
    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
389
    mutex_unlock(&lock);
390
    cond_signal(&cond);
391
}
392

    
393
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
394
{
395
    ssize_t ret;
396

    
397
    mutex_lock(&lock);
398
    ret = aiocb->ret;
399
    mutex_unlock(&lock);
400

    
401
    return ret;
402
}
403

    
404
static int qemu_paio_error(struct qemu_paiocb *aiocb)
405
{
406
    ssize_t ret = qemu_paio_return(aiocb);
407

    
408
    if (ret < 0)
409
        ret = -ret;
410
    else
411
        ret = 0;
412

    
413
    return ret;
414
}
415

    
416
static void posix_aio_read(void *opaque)
417
{
418
    PosixAioState *s = opaque;
419
    struct qemu_paiocb *acb, **pacb;
420
    int ret;
421
    ssize_t len;
422

    
423
    /* read all bytes from signal pipe */
424
    for (;;) {
425
        char bytes[16];
426

    
427
        len = read(s->rfd, bytes, sizeof(bytes));
428
        if (len == -1 && errno == EINTR)
429
            continue; /* try again */
430
        if (len == sizeof(bytes))
431
            continue; /* more to read */
432
        break;
433
    }
434

    
435
    for(;;) {
436
        pacb = &s->first_aio;
437
        for(;;) {
438
            acb = *pacb;
439
            if (!acb)
440
                goto the_end;
441
            ret = qemu_paio_error(acb);
442
            if (ret == ECANCELED) {
443
                /* remove the request */
444
                *pacb = acb->next;
445
                qemu_aio_release(acb);
446
            } else if (ret != EINPROGRESS) {
447
                /* end of aio */
448
                if (ret == 0) {
449
                    ret = qemu_paio_return(acb);
450
                    if (ret == acb->aio_nbytes)
451
                        ret = 0;
452
                    else
453
                        ret = -EINVAL;
454
                } else {
455
                    ret = -ret;
456
                }
457
                /* remove the request */
458
                *pacb = acb->next;
459
                /* call the callback */
460
                acb->common.cb(acb->common.opaque, ret);
461
                qemu_aio_release(acb);
462
                break;
463
            } else {
464
                pacb = &acb->next;
465
            }
466
        }
467
    }
468
 the_end: ;
469
}
470

    
471
static int posix_aio_flush(void *opaque)
472
{
473
    PosixAioState *s = opaque;
474
    return !!s->first_aio;
475
}
476

    
477
static PosixAioState *posix_aio_state;
478

    
479
static void aio_signal_handler(int signum)
480
{
481
    if (posix_aio_state) {
482
        char byte = 0;
483

    
484
        write(posix_aio_state->wfd, &byte, sizeof(byte));
485
    }
486

    
487
    qemu_service_io();
488
}
489

    
490
static void paio_remove(struct qemu_paiocb *acb)
491
{
492
    struct qemu_paiocb **pacb;
493

    
494
    /* remove the callback from the queue */
495
    pacb = &posix_aio_state->first_aio;
496
    for(;;) {
497
        if (*pacb == NULL) {
498
            fprintf(stderr, "paio_remove: aio request not found!\n");
499
            break;
500
        } else if (*pacb == acb) {
501
            *pacb = acb->next;
502
            qemu_aio_release(acb);
503
            break;
504
        }
505
        pacb = &(*pacb)->next;
506
    }
507
}
508

    
509
static void paio_cancel(BlockDriverAIOCB *blockacb)
510
{
511
    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
512
    int active = 0;
513

    
514
    mutex_lock(&lock);
515
    if (!acb->active) {
516
        QTAILQ_REMOVE(&request_list, acb, node);
517
        acb->ret = -ECANCELED;
518
    } else if (acb->ret == -EINPROGRESS) {
519
        active = 1;
520
    }
521
    mutex_unlock(&lock);
522

    
523
    if (active) {
524
        /* fail safe: if the aio could not be canceled, we wait for
525
           it */
526
        while (qemu_paio_error(acb) == EINPROGRESS)
527
            ;
528
    }
529

    
530
    paio_remove(acb);
531
}
532

    
533
static AIOPool raw_aio_pool = {
534
    .aiocb_size         = sizeof(struct qemu_paiocb),
535
    .cancel             = paio_cancel,
536
};
537

    
538
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
539
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
540
        BlockDriverCompletionFunc *cb, void *opaque, int type)
541
{
542
    struct qemu_paiocb *acb;
543

    
544
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
545
    if (!acb)
546
        return NULL;
547
    acb->aio_type = type;
548
    acb->aio_fildes = fd;
549
    acb->ev_signo = SIGUSR2;
550
    if (qiov) {
551
        acb->aio_iov = qiov->iov;
552
        acb->aio_niov = qiov->niov;
553
    }
554
    acb->aio_nbytes = nb_sectors * 512;
555
    acb->aio_offset = sector_num * 512;
556

    
557
    acb->next = posix_aio_state->first_aio;
558
    posix_aio_state->first_aio = acb;
559

    
560
    qemu_paio_submit(acb);
561
    return &acb->common;
562
}
563

    
564
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
565
        unsigned long int req, void *buf,
566
        BlockDriverCompletionFunc *cb, void *opaque)
567
{
568
    struct qemu_paiocb *acb;
569

    
570
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
571
    if (!acb)
572
        return NULL;
573
    acb->aio_type = QEMU_AIO_IOCTL;
574
    acb->aio_fildes = fd;
575
    acb->ev_signo = SIGUSR2;
576
    acb->aio_offset = 0;
577
    acb->aio_ioctl_buf = buf;
578
    acb->aio_ioctl_cmd = req;
579

    
580
    acb->next = posix_aio_state->first_aio;
581
    posix_aio_state->first_aio = acb;
582

    
583
    qemu_paio_submit(acb);
584
    return &acb->common;
585
}
586

    
587
void *paio_init(void)
588
{
589
    struct sigaction act;
590
    PosixAioState *s;
591
    int fds[2];
592
    int ret;
593

    
594
    if (posix_aio_state)
595
        return posix_aio_state;
596

    
597
    s = qemu_malloc(sizeof(PosixAioState));
598

    
599
    sigfillset(&act.sa_mask);
600
    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
601
    act.sa_handler = aio_signal_handler;
602
    sigaction(SIGUSR2, &act, NULL);
603

    
604
    s->first_aio = NULL;
605
    if (pipe(fds) == -1) {
606
        fprintf(stderr, "failed to create pipe\n");
607
        return NULL;
608
    }
609

    
610
    s->rfd = fds[0];
611
    s->wfd = fds[1];
612

    
613
    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
614
    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
615

    
616
    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
617

    
618
    ret = pthread_attr_init(&attr);
619
    if (ret)
620
        die2(ret, "pthread_attr_init");
621

    
622
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
623
    if (ret)
624
        die2(ret, "pthread_attr_setdetachstate");
625

    
626
    QTAILQ_INIT(&request_list);
627

    
628
    posix_aio_state = s;
629

    
630
    return posix_aio_state;
631
}