Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ 94909d9f

History | View | Annotate | Download (9.8 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <sys/ioctl.h>
15
#include <pthread.h>
16
#include <unistd.h>
17
#include <errno.h>
18
#include <time.h>
19
#include <string.h>
20
#include <stdlib.h>
21
#include <stdio.h>
22
#include "osdep.h"
23
#include "qemu-common.h"
24

    
25
#include "posix-aio-compat.h"
26

    
27
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
28
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
29
static pthread_t thread_id;
30
static pthread_attr_t attr;
31
static int max_threads = 64;
32
static int cur_threads = 0;
33
static int idle_threads = 0;
34
static TAILQ_HEAD(, qemu_paiocb) request_list;
35

    
36
#ifdef HAVE_PREADV
37
static int preadv_present = 1;
38
#else
39
static int preadv_present = 0;
40
#endif
41

    
42
static void die2(int err, const char *what)
43
{
44
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
45
    abort();
46
}
47

    
48
static void die(const char *what)
49
{
50
    die2(errno, what);
51
}
52

    
53
static void mutex_lock(pthread_mutex_t *mutex)
54
{
55
    int ret = pthread_mutex_lock(mutex);
56
    if (ret) die2(ret, "pthread_mutex_lock");
57
}
58

    
59
static void mutex_unlock(pthread_mutex_t *mutex)
60
{
61
    int ret = pthread_mutex_unlock(mutex);
62
    if (ret) die2(ret, "pthread_mutex_unlock");
63
}
64

    
65
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
66
                           struct timespec *ts)
67
{
68
    int ret = pthread_cond_timedwait(cond, mutex, ts);
69
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
70
    return ret;
71
}
72

    
73
static void cond_signal(pthread_cond_t *cond)
74
{
75
    int ret = pthread_cond_signal(cond);
76
    if (ret) die2(ret, "pthread_cond_signal");
77
}
78

    
79
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
80
                          void *(*start_routine)(void*), void *arg)
81
{
82
    int ret = pthread_create(thread, attr, start_routine, arg);
83
    if (ret) die2(ret, "pthread_create");
84
}
85

    
86
static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
87
{
88
        int ret;
89

    
90
        ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
91
        if (ret == -1)
92
                return -errno;
93
        return ret;
94
}
95

    
96
#ifdef HAVE_PREADV
97

    
98
static ssize_t
99
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
100
{
101
    return preadv(fd, iov, nr_iov, offset);
102
}
103

    
104
static ssize_t
105
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
106
{
107
    return pwritev(fd, iov, nr_iov, offset);
108
}
109

    
110
#else
111

    
112
static ssize_t
113
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
114
{
115
    return -ENOSYS;
116
}
117

    
118
static ssize_t
119
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
120
{
121
    return -ENOSYS;
122
}
123

    
124
#endif
125

    
126
/*
127
 * Check if we need to copy the data in the aiocb into a new
128
 * properly aligned buffer.
129
 */
130
static int aiocb_needs_copy(struct qemu_paiocb *aiocb)
131
{
132
    if (aiocb->aio_flags & QEMU_AIO_SECTOR_ALIGNED) {
133
        int i;
134

    
135
        for (i = 0; i < aiocb->aio_niov; i++)
136
            if ((uintptr_t) aiocb->aio_iov[i].iov_base % 512)
137
                return 1;
138
    }
139

    
140
    return 0;
141
}
142

    
143
static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
144
{
145
    size_t offset = 0;
146
    ssize_t len;
147

    
148
    do {
149
        if (aiocb->aio_type == QEMU_PAIO_WRITE)
150
            len = qemu_pwritev(aiocb->aio_fildes,
151
                               aiocb->aio_iov,
152
                               aiocb->aio_niov,
153
                               aiocb->aio_offset + offset);
154
         else
155
            len = qemu_preadv(aiocb->aio_fildes,
156
                              aiocb->aio_iov,
157
                              aiocb->aio_niov,
158
                              aiocb->aio_offset + offset);
159
    } while (len == -1 && errno == EINTR);
160

    
161
    if (len == -1)
162
        return -errno;
163
    return len;
164
}
165

    
166
static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
167
{
168
    size_t offset = 0;
169
    size_t len;
170

    
171
    while (offset < aiocb->aio_nbytes) {
172
         if (aiocb->aio_type == QEMU_PAIO_WRITE)
173
             len = pwrite(aiocb->aio_fildes,
174
                          (const char *)buf + offset,
175
                          aiocb->aio_nbytes - offset,
176
                          aiocb->aio_offset + offset);
177
         else
178
             len = pread(aiocb->aio_fildes,
179
                         buf + offset,
180
                         aiocb->aio_nbytes - offset,
181
                         aiocb->aio_offset + offset);
182

    
183
         if (len == -1 && errno == EINTR)
184
             continue;
185
         else if (len == -1) {
186
             offset = -errno;
187
             break;
188
         } else if (len == 0)
189
             break;
190

    
191
         offset += len;
192
    }
193

    
194
    return offset;
195
}
196

    
197
static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
198
{
199
    size_t nbytes;
200
    char *buf;
201

    
202
    if (!aiocb_needs_copy(aiocb)) {
203
        /*
204
         * If there is just a single buffer, and it is properly aligned
205
         * we can just use plain pread/pwrite without any problems.
206
         */
207
        if (aiocb->aio_niov == 1)
208
             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
209

    
210
        /*
211
         * We have more than one iovec, and all are properly aligned.
212
         *
213
         * Try preadv/pwritev first and fall back to linearizing the
214
         * buffer if it's not supported.
215
         */
216
        if (preadv_present) {
217
            nbytes = handle_aiocb_rw_vector(aiocb);
218
            if (nbytes == aiocb->aio_nbytes)
219
                return nbytes;
220
            if (nbytes < 0 && nbytes != -ENOSYS)
221
                return nbytes;
222
            preadv_present = 0;
223
        }
224

    
225
        /*
226
         * XXX(hch): short read/write.  no easy way to handle the reminder
227
         * using these interfaces.  For now retry using plain
228
         * pread/pwrite?
229
         */
230
    }
231

    
232
    /*
233
     * Ok, we have to do it the hard way, copy all segments into
234
     * a single aligned buffer.
235
     */
236
    buf = qemu_memalign(512, aiocb->aio_nbytes);
237
    if (aiocb->aio_type == QEMU_PAIO_WRITE) {
238
        char *p = buf;
239
        int i;
240

    
241
        for (i = 0; i < aiocb->aio_niov; ++i) {
242
            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
243
            p += aiocb->aio_iov[i].iov_len;
244
        }
245
    }
246

    
247
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
248
    if (aiocb->aio_type != QEMU_PAIO_WRITE) {
249
        char *p = buf;
250
        size_t count = aiocb->aio_nbytes, copy;
251
        int i;
252

    
253
        for (i = 0; i < aiocb->aio_niov && count; ++i) {
254
            copy = count;
255
            if (copy > aiocb->aio_iov[i].iov_len)
256
                copy = aiocb->aio_iov[i].iov_len;
257
            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
258
            p     += copy;
259
            count -= copy;
260
        }
261
    }
262
    qemu_vfree(buf);
263

    
264
    return nbytes;
265
}
266

    
267
static void *aio_thread(void *unused)
268
{
269
    pid_t pid;
270
    sigset_t set;
271

    
272
    pid = getpid();
273

    
274
    /* block all signals */
275
    if (sigfillset(&set)) die("sigfillset");
276
    if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
277

    
278
    while (1) {
279
        struct qemu_paiocb *aiocb;
280
        size_t ret = 0;
281
        qemu_timeval tv;
282
        struct timespec ts;
283

    
284
        qemu_gettimeofday(&tv);
285
        ts.tv_sec = tv.tv_sec + 10;
286
        ts.tv_nsec = 0;
287

    
288
        mutex_lock(&lock);
289

    
290
        while (TAILQ_EMPTY(&request_list) &&
291
               !(ret == ETIMEDOUT)) {
292
            ret = cond_timedwait(&cond, &lock, &ts);
293
        }
294

    
295
        if (TAILQ_EMPTY(&request_list))
296
            break;
297

    
298
        aiocb = TAILQ_FIRST(&request_list);
299
        TAILQ_REMOVE(&request_list, aiocb, node);
300
        aiocb->active = 1;
301
        idle_threads--;
302
        mutex_unlock(&lock);
303

    
304
        switch (aiocb->aio_type) {
305
        case QEMU_PAIO_READ:
306
        case QEMU_PAIO_WRITE:
307
                ret = handle_aiocb_rw(aiocb);
308
                break;
309
        case QEMU_PAIO_IOCTL:
310
                ret = handle_aiocb_ioctl(aiocb);
311
                break;
312
        default:
313
                fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
314
                ret = -EINVAL;
315
                break;
316
        }
317

    
318
        mutex_lock(&lock);
319
        aiocb->ret = ret;
320
        idle_threads++;
321
        mutex_unlock(&lock);
322

    
323
        if (kill(pid, aiocb->ev_signo)) die("kill failed");
324
    }
325

    
326
    idle_threads--;
327
    cur_threads--;
328
    mutex_unlock(&lock);
329

    
330
    return NULL;
331
}
332

    
333
static void spawn_thread(void)
334
{
335
    cur_threads++;
336
    idle_threads++;
337
    thread_create(&thread_id, &attr, aio_thread, NULL);
338
}
339

    
340
int qemu_paio_init(struct qemu_paioinit *aioinit)
341
{
342
    int ret;
343

    
344
    ret = pthread_attr_init(&attr);
345
    if (ret) die2(ret, "pthread_attr_init");
346

    
347
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
348
    if (ret) die2(ret, "pthread_attr_setdetachstate");
349

    
350
    TAILQ_INIT(&request_list);
351

    
352
    return 0;
353
}
354

    
355
static int qemu_paio_submit(struct qemu_paiocb *aiocb, int type)
356
{
357
    aiocb->aio_type = type;
358
    aiocb->ret = -EINPROGRESS;
359
    aiocb->active = 0;
360
    mutex_lock(&lock);
361
    if (idle_threads == 0 && cur_threads < max_threads)
362
        spawn_thread();
363
    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
364
    mutex_unlock(&lock);
365
    cond_signal(&cond);
366

    
367
    return 0;
368
}
369

    
370
int qemu_paio_read(struct qemu_paiocb *aiocb)
371
{
372
    return qemu_paio_submit(aiocb, QEMU_PAIO_READ);
373
}
374

    
375
int qemu_paio_write(struct qemu_paiocb *aiocb)
376
{
377
    return qemu_paio_submit(aiocb, QEMU_PAIO_WRITE);
378
}
379

    
380
int qemu_paio_ioctl(struct qemu_paiocb *aiocb)
381
{
382
    return qemu_paio_submit(aiocb, QEMU_PAIO_IOCTL);
383
}
384

    
385
ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
386
{
387
    ssize_t ret;
388

    
389
    mutex_lock(&lock);
390
    ret = aiocb->ret;
391
    mutex_unlock(&lock);
392

    
393
    return ret;
394
}
395

    
396
int qemu_paio_error(struct qemu_paiocb *aiocb)
397
{
398
    ssize_t ret = qemu_paio_return(aiocb);
399

    
400
    if (ret < 0)
401
        ret = -ret;
402
    else
403
        ret = 0;
404

    
405
    return ret;
406
}
407

    
408
int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
409
{
410
    int ret;
411

    
412
    mutex_lock(&lock);
413
    if (!aiocb->active) {
414
        TAILQ_REMOVE(&request_list, aiocb, node);
415
        aiocb->ret = -ECANCELED;
416
        ret = QEMU_PAIO_CANCELED;
417
    } else if (aiocb->ret == -EINPROGRESS)
418
        ret = QEMU_PAIO_NOTCANCELED;
419
    else
420
        ret = QEMU_PAIO_ALLDONE;
421
    mutex_unlock(&lock);
422

    
423
    return ret;
424
}