Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ 5493e33f

History | View | Annotate | Download (10.1 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <sys/ioctl.h>
15
#include <pthread.h>
16
#include <unistd.h>
17
#include <errno.h>
18
#include <time.h>
19
#include <string.h>
20
#include <stdlib.h>
21
#include <stdio.h>
22
#include "osdep.h"
23
#include "qemu-common.h"
24

    
25
#include "posix-aio-compat.h"
26

    
27
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
28
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
29
static pthread_t thread_id;
30
static pthread_attr_t attr;
31
static int max_threads = 64;
32
static int cur_threads = 0;
33
static int idle_threads = 0;
34
static TAILQ_HEAD(, qemu_paiocb) request_list;
35

    
36
#ifdef HAVE_PREADV
37
static int preadv_present = 1;
38
#else
39
static int preadv_present = 0;
40
#endif
41

    
42
static void die2(int err, const char *what)
43
{
44
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
45
    abort();
46
}
47

    
48
static void die(const char *what)
49
{
50
    die2(errno, what);
51
}
52

    
53
static void mutex_lock(pthread_mutex_t *mutex)
54
{
55
    int ret = pthread_mutex_lock(mutex);
56
    if (ret) die2(ret, "pthread_mutex_lock");
57
}
58

    
59
static void mutex_unlock(pthread_mutex_t *mutex)
60
{
61
    int ret = pthread_mutex_unlock(mutex);
62
    if (ret) die2(ret, "pthread_mutex_unlock");
63
}
64

    
65
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
66
                           struct timespec *ts)
67
{
68
    int ret = pthread_cond_timedwait(cond, mutex, ts);
69
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
70
    return ret;
71
}
72

    
73
static void cond_signal(pthread_cond_t *cond)
74
{
75
    int ret = pthread_cond_signal(cond);
76
    if (ret) die2(ret, "pthread_cond_signal");
77
}
78

    
79
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
80
                          void *(*start_routine)(void*), void *arg)
81
{
82
    int ret = pthread_create(thread, attr, start_routine, arg);
83
    if (ret) die2(ret, "pthread_create");
84
}
85

    
86
static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
87
{
88
        int ret;
89

    
90
        ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
91
        if (ret == -1)
92
                return -errno;
93

    
94
        /*
95
         * This looks weird, but the aio code only consideres a request
96
         * successfull if it has written the number full number of bytes.
97
         *
98
         * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
99
         * so in fact we return the ioctl command here to make posix_aio_read()
100
         * happy..
101
         */
102
        return aiocb->aio_nbytes;
103
}
104

    
105
#ifdef HAVE_PREADV
106

    
107
static ssize_t
108
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
109
{
110
    return preadv(fd, iov, nr_iov, offset);
111
}
112

    
113
static ssize_t
114
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
115
{
116
    return pwritev(fd, iov, nr_iov, offset);
117
}
118

    
119
#else
120

    
121
static ssize_t
122
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
123
{
124
    return -ENOSYS;
125
}
126

    
127
static ssize_t
128
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
129
{
130
    return -ENOSYS;
131
}
132

    
133
#endif
134

    
135
/*
136
 * Check if we need to copy the data in the aiocb into a new
137
 * properly aligned buffer.
138
 */
139
static int aiocb_needs_copy(struct qemu_paiocb *aiocb)
140
{
141
    if (aiocb->aio_flags & QEMU_AIO_SECTOR_ALIGNED) {
142
        int i;
143

    
144
        for (i = 0; i < aiocb->aio_niov; i++)
145
            if ((uintptr_t) aiocb->aio_iov[i].iov_base % 512)
146
                return 1;
147
    }
148

    
149
    return 0;
150
}
151

    
152
static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
153
{
154
    size_t offset = 0;
155
    ssize_t len;
156

    
157
    do {
158
        if (aiocb->aio_type == QEMU_PAIO_WRITE)
159
            len = qemu_pwritev(aiocb->aio_fildes,
160
                               aiocb->aio_iov,
161
                               aiocb->aio_niov,
162
                               aiocb->aio_offset + offset);
163
         else
164
            len = qemu_preadv(aiocb->aio_fildes,
165
                              aiocb->aio_iov,
166
                              aiocb->aio_niov,
167
                              aiocb->aio_offset + offset);
168
    } while (len == -1 && errno == EINTR);
169

    
170
    if (len == -1)
171
        return -errno;
172
    return len;
173
}
174

    
175
static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
176
{
177
    size_t offset = 0;
178
    size_t len;
179

    
180
    while (offset < aiocb->aio_nbytes) {
181
         if (aiocb->aio_type == QEMU_PAIO_WRITE)
182
             len = pwrite(aiocb->aio_fildes,
183
                          (const char *)buf + offset,
184
                          aiocb->aio_nbytes - offset,
185
                          aiocb->aio_offset + offset);
186
         else
187
             len = pread(aiocb->aio_fildes,
188
                         buf + offset,
189
                         aiocb->aio_nbytes - offset,
190
                         aiocb->aio_offset + offset);
191

    
192
         if (len == -1 && errno == EINTR)
193
             continue;
194
         else if (len == -1) {
195
             offset = -errno;
196
             break;
197
         } else if (len == 0)
198
             break;
199

    
200
         offset += len;
201
    }
202

    
203
    return offset;
204
}
205

    
206
static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
207
{
208
    size_t nbytes;
209
    char *buf;
210

    
211
    if (!aiocb_needs_copy(aiocb)) {
212
        /*
213
         * If there is just a single buffer, and it is properly aligned
214
         * we can just use plain pread/pwrite without any problems.
215
         */
216
        if (aiocb->aio_niov == 1)
217
             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
218

    
219
        /*
220
         * We have more than one iovec, and all are properly aligned.
221
         *
222
         * Try preadv/pwritev first and fall back to linearizing the
223
         * buffer if it's not supported.
224
         */
225
        if (preadv_present) {
226
            nbytes = handle_aiocb_rw_vector(aiocb);
227
            if (nbytes == aiocb->aio_nbytes)
228
                return nbytes;
229
            if (nbytes < 0 && nbytes != -ENOSYS)
230
                return nbytes;
231
            preadv_present = 0;
232
        }
233

    
234
        /*
235
         * XXX(hch): short read/write.  no easy way to handle the reminder
236
         * using these interfaces.  For now retry using plain
237
         * pread/pwrite?
238
         */
239
    }
240

    
241
    /*
242
     * Ok, we have to do it the hard way, copy all segments into
243
     * a single aligned buffer.
244
     */
245
    buf = qemu_memalign(512, aiocb->aio_nbytes);
246
    if (aiocb->aio_type == QEMU_PAIO_WRITE) {
247
        char *p = buf;
248
        int i;
249

    
250
        for (i = 0; i < aiocb->aio_niov; ++i) {
251
            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
252
            p += aiocb->aio_iov[i].iov_len;
253
        }
254
    }
255

    
256
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
257
    if (aiocb->aio_type != QEMU_PAIO_WRITE) {
258
        char *p = buf;
259
        size_t count = aiocb->aio_nbytes, copy;
260
        int i;
261

    
262
        for (i = 0; i < aiocb->aio_niov && count; ++i) {
263
            copy = count;
264
            if (copy > aiocb->aio_iov[i].iov_len)
265
                copy = aiocb->aio_iov[i].iov_len;
266
            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
267
            p     += copy;
268
            count -= copy;
269
        }
270
    }
271
    qemu_vfree(buf);
272

    
273
    return nbytes;
274
}
275

    
276
static void *aio_thread(void *unused)
277
{
278
    pid_t pid;
279
    sigset_t set;
280

    
281
    pid = getpid();
282

    
283
    /* block all signals */
284
    if (sigfillset(&set)) die("sigfillset");
285
    if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
286

    
287
    while (1) {
288
        struct qemu_paiocb *aiocb;
289
        size_t ret = 0;
290
        qemu_timeval tv;
291
        struct timespec ts;
292

    
293
        qemu_gettimeofday(&tv);
294
        ts.tv_sec = tv.tv_sec + 10;
295
        ts.tv_nsec = 0;
296

    
297
        mutex_lock(&lock);
298

    
299
        while (TAILQ_EMPTY(&request_list) &&
300
               !(ret == ETIMEDOUT)) {
301
            ret = cond_timedwait(&cond, &lock, &ts);
302
        }
303

    
304
        if (TAILQ_EMPTY(&request_list))
305
            break;
306

    
307
        aiocb = TAILQ_FIRST(&request_list);
308
        TAILQ_REMOVE(&request_list, aiocb, node);
309
        aiocb->active = 1;
310
        idle_threads--;
311
        mutex_unlock(&lock);
312

    
313
        switch (aiocb->aio_type) {
314
        case QEMU_PAIO_READ:
315
        case QEMU_PAIO_WRITE:
316
                ret = handle_aiocb_rw(aiocb);
317
                break;
318
        case QEMU_PAIO_IOCTL:
319
                ret = handle_aiocb_ioctl(aiocb);
320
                break;
321
        default:
322
                fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
323
                ret = -EINVAL;
324
                break;
325
        }
326

    
327
        mutex_lock(&lock);
328
        aiocb->ret = ret;
329
        idle_threads++;
330
        mutex_unlock(&lock);
331

    
332
        if (kill(pid, aiocb->ev_signo)) die("kill failed");
333
    }
334

    
335
    idle_threads--;
336
    cur_threads--;
337
    mutex_unlock(&lock);
338

    
339
    return NULL;
340
}
341

    
342
static void spawn_thread(void)
343
{
344
    cur_threads++;
345
    idle_threads++;
346
    thread_create(&thread_id, &attr, aio_thread, NULL);
347
}
348

    
349
int qemu_paio_init(struct qemu_paioinit *aioinit)
350
{
351
    int ret;
352

    
353
    ret = pthread_attr_init(&attr);
354
    if (ret) die2(ret, "pthread_attr_init");
355

    
356
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
357
    if (ret) die2(ret, "pthread_attr_setdetachstate");
358

    
359
    TAILQ_INIT(&request_list);
360

    
361
    return 0;
362
}
363

    
364
static int qemu_paio_submit(struct qemu_paiocb *aiocb, int type)
365
{
366
    aiocb->aio_type = type;
367
    aiocb->ret = -EINPROGRESS;
368
    aiocb->active = 0;
369
    mutex_lock(&lock);
370
    if (idle_threads == 0 && cur_threads < max_threads)
371
        spawn_thread();
372
    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
373
    mutex_unlock(&lock);
374
    cond_signal(&cond);
375

    
376
    return 0;
377
}
378

    
379
int qemu_paio_read(struct qemu_paiocb *aiocb)
380
{
381
    return qemu_paio_submit(aiocb, QEMU_PAIO_READ);
382
}
383

    
384
int qemu_paio_write(struct qemu_paiocb *aiocb)
385
{
386
    return qemu_paio_submit(aiocb, QEMU_PAIO_WRITE);
387
}
388

    
389
int qemu_paio_ioctl(struct qemu_paiocb *aiocb)
390
{
391
    return qemu_paio_submit(aiocb, QEMU_PAIO_IOCTL);
392
}
393

    
394
ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
395
{
396
    ssize_t ret;
397

    
398
    mutex_lock(&lock);
399
    ret = aiocb->ret;
400
    mutex_unlock(&lock);
401

    
402
    return ret;
403
}
404

    
405
int qemu_paio_error(struct qemu_paiocb *aiocb)
406
{
407
    ssize_t ret = qemu_paio_return(aiocb);
408

    
409
    if (ret < 0)
410
        ret = -ret;
411
    else
412
        ret = 0;
413

    
414
    return ret;
415
}
416

    
417
int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
418
{
419
    int ret;
420

    
421
    mutex_lock(&lock);
422
    if (!aiocb->active) {
423
        TAILQ_REMOVE(&request_list, aiocb, node);
424
        aiocb->ret = -ECANCELED;
425
        ret = QEMU_PAIO_CANCELED;
426
    } else if (aiocb->ret == -EINPROGRESS)
427
        ret = QEMU_PAIO_NOTCANCELED;
428
    else
429
        ret = QEMU_PAIO_ALLDONE;
430
    mutex_unlock(&lock);
431

    
432
    return ret;
433
}