Statistics
| Branch: | Revision:

root / linux-aio.c @ 496eb021

History | View | Annotate | Download (5 kB)

1
/*
2
 * Linux native AIO support.
3
 *
4
 * Copyright (C) 2009 IBM, Corp.
5
 * Copyright (C) 2009 Red Hat, Inc.
6
 *
7
 * This work is licensed under the terms of the GNU GPL, version 2 or later.
8
 * See the COPYING file in the top-level directory.
9
 */
10
#include "qemu-common.h"
11
#include "qemu-aio.h"
12
#include "block_int.h"
13
#include "block/raw-posix-aio.h"
14

    
15
#include <sys/eventfd.h>
16
#include <libaio.h>
17

    
18
/*
19
 * Queue size (per-device).
20
 *
21
 * XXX: eventually we need to communicate this to the guest and/or make it
22
 *      tunable by the guest.  If we get more outstanding requests at a time
23
 *      than this we will get EAGAIN from io_submit which is communicated to
24
 *      the guest as an I/O error.
25
 */
26
#define MAX_EVENTS 128
27

    
28
struct qemu_laiocb {
29
    BlockDriverAIOCB common;
30
    struct qemu_laio_state *ctx;
31
    struct iocb iocb;
32
    ssize_t ret;
33
    size_t nbytes;
34
};
35

    
36
struct qemu_laio_state {
37
    io_context_t ctx;
38
    int efd;
39
    int count;
40
};
41

    
42
static inline ssize_t io_event_ret(struct io_event *ev)
43
{
44
    return (ssize_t)(((uint64_t)ev->res2 << 32) | ev->res);
45
}
46

    
47
static void qemu_laio_completion_cb(void *opaque)
48
{
49
    struct qemu_laio_state *s = opaque;
50

    
51
    while (1) {
52
        struct io_event events[MAX_EVENTS];
53
        uint64_t val;
54
        ssize_t ret;
55
        struct timespec ts = { 0 };
56
        int nevents, i;
57

    
58
        do {
59
            ret = read(s->efd, &val, sizeof(val));
60
        } while (ret == 1 && errno == EINTR);
61

    
62
        if (ret == -1 && errno == EAGAIN)
63
            break;
64

    
65
        if (ret != 8)
66
            break;
67

    
68
        do {
69
            nevents = io_getevents(s->ctx, val, MAX_EVENTS, events, &ts);
70
        } while (nevents == -EINTR);
71

    
72
        for (i = 0; i < nevents; i++) {
73
            struct iocb *iocb = events[i].obj;
74
            struct qemu_laiocb *laiocb =
75
                    container_of(iocb, struct qemu_laiocb, iocb);
76

    
77
            s->count--;
78

    
79
            ret = laiocb->ret = io_event_ret(&events[i]);
80
            if (ret != -ECANCELED) {
81
                if (ret == laiocb->nbytes)
82
                    ret = 0;
83
                else if (ret >= 0)
84
                    ret = -EINVAL;
85

    
86
                laiocb->common.cb(laiocb->common.opaque, ret);
87
            }
88

    
89
            qemu_aio_release(laiocb);
90
        }
91
    }
92
}
93

    
94
static int qemu_laio_flush_cb(void *opaque)
95
{
96
    struct qemu_laio_state *s = opaque;
97

    
98
    return (s->count > 0) ? 1 : 0;
99
}
100

    
101
static void laio_cancel(BlockDriverAIOCB *blockacb)
102
{
103
    struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
104
    struct io_event event;
105
    int ret;
106

    
107
    if (laiocb->ret != -EINPROGRESS)
108
        return;
109

    
110
    /*
111
     * Note that as of Linux 2.6.31 neither the block device code nor any
112
     * filesystem implements cancellation of AIO request.
113
     * Thus the polling loop below is the normal code path.
114
     */
115
    ret = io_cancel(laiocb->ctx->ctx, &laiocb->iocb, &event);
116
    if (ret == 0) {
117
        laiocb->ret = -ECANCELED;
118
        return;
119
    }
120

    
121
    /*
122
     * We have to wait for the iocb to finish.
123
     *
124
     * The only way to get the iocb status update is by polling the io context.
125
     * We might be able to do this slightly more optimal by removing the
126
     * O_NONBLOCK flag.
127
     */
128
    while (laiocb->ret == -EINPROGRESS)
129
        qemu_laio_completion_cb(laiocb->ctx);
130
}
131

    
132
static AIOPool laio_pool = {
133
    .aiocb_size         = sizeof(struct qemu_laiocb),
134
    .cancel             = laio_cancel,
135
};
136

    
137
BlockDriverAIOCB *laio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
138
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
139
        BlockDriverCompletionFunc *cb, void *opaque, int type)
140
{
141
    struct qemu_laio_state *s = aio_ctx;
142
    struct qemu_laiocb *laiocb;
143
    struct iocb *iocbs;
144
    off_t offset = sector_num * 512;
145

    
146
    laiocb = qemu_aio_get(&laio_pool, bs, cb, opaque);
147
    if (!laiocb)
148
        return NULL;
149
    laiocb->nbytes = nb_sectors * 512;
150
    laiocb->ctx = s;
151
    laiocb->ret = -EINPROGRESS;
152

    
153
    iocbs = &laiocb->iocb;
154

    
155
    switch (type) {
156
    case QEMU_AIO_WRITE:
157
        io_prep_pwritev(iocbs, fd, qiov->iov, qiov->niov, offset);
158
        break;
159
    case QEMU_AIO_READ:
160
        io_prep_preadv(iocbs, fd, qiov->iov, qiov->niov, offset);
161
        break;
162
    default:
163
        fprintf(stderr, "%s: invalid AIO request type 0x%x.\n",
164
                        __func__, type);
165
        goto out_free_aiocb;
166
    }
167
    io_set_eventfd(&laiocb->iocb, s->efd);
168
    s->count++;
169

    
170
    if (io_submit(s->ctx, 1, &iocbs) < 0)
171
        goto out_dec_count;
172
    return &laiocb->common;
173

    
174
out_free_aiocb:
175
    qemu_aio_release(laiocb);
176
out_dec_count:
177
    s->count--;
178
    return NULL;
179
}
180

    
181
void *laio_init(void)
182
{
183
    struct qemu_laio_state *s;
184

    
185
    s = qemu_mallocz(sizeof(*s));
186
    s->efd = eventfd(0, 0);
187
    if (s->efd == -1)
188
        goto out_free_state;
189
    fcntl(s->efd, F_SETFL, O_NONBLOCK);
190

    
191
    if (io_setup(MAX_EVENTS, &s->ctx) != 0)
192
        goto out_close_efd;
193

    
194
    qemu_aio_set_fd_handler(s->efd, qemu_laio_completion_cb,
195
                            NULL, qemu_laio_flush_cb, s);
196

    
197
    return s;
198

    
199
out_close_efd:
200
    close(s->efd);
201
out_free_state:
202
    qemu_free(s);
203
    return NULL;
204
}