Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ 72cf2d4f

History | View | Annotate | Download (14.8 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <sys/ioctl.h>
15
#include <sys/types.h>
16
#include <pthread.h>
17
#include <unistd.h>
18
#include <errno.h>
19
#include <time.h>
20
#include <signal.h>
21
#include <string.h>
22
#include <stdlib.h>
23
#include <stdio.h>
24

    
25
#include "qemu-queue.h"
26
#include "osdep.h"
27
#include "qemu-common.h"
28
#include "block_int.h"
29

    
30
#include "block/raw-posix-aio.h"
31

    
32

    
33
struct qemu_paiocb {
34
    BlockDriverAIOCB common;
35
    int aio_fildes;
36
    union {
37
        struct iovec *aio_iov;
38
        void *aio_ioctl_buf;
39
    };
40
    int aio_niov;
41
    size_t aio_nbytes;
42
#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
43
    int ev_signo;
44
    off_t aio_offset;
45

    
46
    QTAILQ_ENTRY(qemu_paiocb) node;
47
    int aio_type;
48
    ssize_t ret;
49
    int active;
50
    struct qemu_paiocb *next;
51
};
52

    
53
typedef struct PosixAioState {
54
    int rfd, wfd;
55
    struct qemu_paiocb *first_aio;
56
} PosixAioState;
57

    
58

    
59
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
60
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
61
static pthread_t thread_id;
62
static pthread_attr_t attr;
63
static int max_threads = 64;
64
static int cur_threads = 0;
65
static int idle_threads = 0;
66
static QTAILQ_HEAD(, qemu_paiocb) request_list;
67

    
68
#ifdef CONFIG_PREADV
69
static int preadv_present = 1;
70
#else
71
static int preadv_present = 0;
72
#endif
73

    
74
static void die2(int err, const char *what)
75
{
76
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
77
    abort();
78
}
79

    
80
static void die(const char *what)
81
{
82
    die2(errno, what);
83
}
84

    
85
static void mutex_lock(pthread_mutex_t *mutex)
86
{
87
    int ret = pthread_mutex_lock(mutex);
88
    if (ret) die2(ret, "pthread_mutex_lock");
89
}
90

    
91
static void mutex_unlock(pthread_mutex_t *mutex)
92
{
93
    int ret = pthread_mutex_unlock(mutex);
94
    if (ret) die2(ret, "pthread_mutex_unlock");
95
}
96

    
97
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
98
                           struct timespec *ts)
99
{
100
    int ret = pthread_cond_timedwait(cond, mutex, ts);
101
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
102
    return ret;
103
}
104

    
105
static void cond_signal(pthread_cond_t *cond)
106
{
107
    int ret = pthread_cond_signal(cond);
108
    if (ret) die2(ret, "pthread_cond_signal");
109
}
110

    
111
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
112
                          void *(*start_routine)(void*), void *arg)
113
{
114
    int ret = pthread_create(thread, attr, start_routine, arg);
115
    if (ret) die2(ret, "pthread_create");
116
}
117

    
118
static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
119
{
120
        int ret;
121

    
122
        ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
123
        if (ret == -1)
124
                return -errno;
125

    
126
        /*
127
         * This looks weird, but the aio code only consideres a request
128
         * successfull if it has written the number full number of bytes.
129
         *
130
         * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
131
         * so in fact we return the ioctl command here to make posix_aio_read()
132
         * happy..
133
         */
134
        return aiocb->aio_nbytes;
135
}
136

    
137
static size_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
138
{
139
    int ret;
140

    
141
    ret = qemu_fdatasync(aiocb->aio_fildes);
142
    if (ret == -1)
143
        return -errno;
144
    return 0;
145
}
146

    
147
#ifdef CONFIG_PREADV
148

    
149
static ssize_t
150
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
151
{
152
    return preadv(fd, iov, nr_iov, offset);
153
}
154

    
155
static ssize_t
156
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
157
{
158
    return pwritev(fd, iov, nr_iov, offset);
159
}
160

    
161
#else
162

    
163
static ssize_t
164
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
165
{
166
    return -ENOSYS;
167
}
168

    
169
static ssize_t
170
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
171
{
172
    return -ENOSYS;
173
}
174

    
175
#endif
176

    
177
static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
178
{
179
    size_t offset = 0;
180
    ssize_t len;
181

    
182
    do {
183
        if (aiocb->aio_type & QEMU_AIO_WRITE)
184
            len = qemu_pwritev(aiocb->aio_fildes,
185
                               aiocb->aio_iov,
186
                               aiocb->aio_niov,
187
                               aiocb->aio_offset + offset);
188
         else
189
            len = qemu_preadv(aiocb->aio_fildes,
190
                              aiocb->aio_iov,
191
                              aiocb->aio_niov,
192
                              aiocb->aio_offset + offset);
193
    } while (len == -1 && errno == EINTR);
194

    
195
    if (len == -1)
196
        return -errno;
197
    return len;
198
}
199

    
200
static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
201
{
202
    size_t offset = 0;
203
    size_t len;
204

    
205
    while (offset < aiocb->aio_nbytes) {
206
         if (aiocb->aio_type & QEMU_AIO_WRITE)
207
             len = pwrite(aiocb->aio_fildes,
208
                          (const char *)buf + offset,
209
                          aiocb->aio_nbytes - offset,
210
                          aiocb->aio_offset + offset);
211
         else
212
             len = pread(aiocb->aio_fildes,
213
                         buf + offset,
214
                         aiocb->aio_nbytes - offset,
215
                         aiocb->aio_offset + offset);
216

    
217
         if (len == -1 && errno == EINTR)
218
             continue;
219
         else if (len == -1) {
220
             offset = -errno;
221
             break;
222
         } else if (len == 0)
223
             break;
224

    
225
         offset += len;
226
    }
227

    
228
    return offset;
229
}
230

    
231
static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
232
{
233
    size_t nbytes;
234
    char *buf;
235

    
236
    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
237
        /*
238
         * If there is just a single buffer, and it is properly aligned
239
         * we can just use plain pread/pwrite without any problems.
240
         */
241
        if (aiocb->aio_niov == 1)
242
             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
243

    
244
        /*
245
         * We have more than one iovec, and all are properly aligned.
246
         *
247
         * Try preadv/pwritev first and fall back to linearizing the
248
         * buffer if it's not supported.
249
         */
250
        if (preadv_present) {
251
            nbytes = handle_aiocb_rw_vector(aiocb);
252
            if (nbytes == aiocb->aio_nbytes)
253
                return nbytes;
254
            if (nbytes < 0 && nbytes != -ENOSYS)
255
                return nbytes;
256
            preadv_present = 0;
257
        }
258

    
259
        /*
260
         * XXX(hch): short read/write.  no easy way to handle the reminder
261
         * using these interfaces.  For now retry using plain
262
         * pread/pwrite?
263
         */
264
    }
265

    
266
    /*
267
     * Ok, we have to do it the hard way, copy all segments into
268
     * a single aligned buffer.
269
     */
270
    buf = qemu_memalign(512, aiocb->aio_nbytes);
271
    if (aiocb->aio_type & QEMU_AIO_WRITE) {
272
        char *p = buf;
273
        int i;
274

    
275
        for (i = 0; i < aiocb->aio_niov; ++i) {
276
            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
277
            p += aiocb->aio_iov[i].iov_len;
278
        }
279
    }
280

    
281
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
282
    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
283
        char *p = buf;
284
        size_t count = aiocb->aio_nbytes, copy;
285
        int i;
286

    
287
        for (i = 0; i < aiocb->aio_niov && count; ++i) {
288
            copy = count;
289
            if (copy > aiocb->aio_iov[i].iov_len)
290
                copy = aiocb->aio_iov[i].iov_len;
291
            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
292
            p     += copy;
293
            count -= copy;
294
        }
295
    }
296
    qemu_vfree(buf);
297

    
298
    return nbytes;
299
}
300

    
301
static void *aio_thread(void *unused)
302
{
303
    pid_t pid;
304
    sigset_t set;
305

    
306
    pid = getpid();
307

    
308
    /* block all signals */
309
    if (sigfillset(&set)) die("sigfillset");
310
    if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
311

    
312
    while (1) {
313
        struct qemu_paiocb *aiocb;
314
        size_t ret = 0;
315
        qemu_timeval tv;
316
        struct timespec ts;
317

    
318
        qemu_gettimeofday(&tv);
319
        ts.tv_sec = tv.tv_sec + 10;
320
        ts.tv_nsec = 0;
321

    
322
        mutex_lock(&lock);
323

    
324
        while (QTAILQ_EMPTY(&request_list) &&
325
               !(ret == ETIMEDOUT)) {
326
            ret = cond_timedwait(&cond, &lock, &ts);
327
        }
328

    
329
        if (QTAILQ_EMPTY(&request_list))
330
            break;
331

    
332
        aiocb = QTAILQ_FIRST(&request_list);
333
        QTAILQ_REMOVE(&request_list, aiocb, node);
334
        aiocb->active = 1;
335
        idle_threads--;
336
        mutex_unlock(&lock);
337

    
338
        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
339
        case QEMU_AIO_READ:
340
        case QEMU_AIO_WRITE:
341
                ret = handle_aiocb_rw(aiocb);
342
                break;
343
        case QEMU_AIO_FLUSH:
344
                ret = handle_aiocb_flush(aiocb);
345
                break;
346
        case QEMU_AIO_IOCTL:
347
                ret = handle_aiocb_ioctl(aiocb);
348
                break;
349
        default:
350
                fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
351
                ret = -EINVAL;
352
                break;
353
        }
354

    
355
        mutex_lock(&lock);
356
        aiocb->ret = ret;
357
        idle_threads++;
358
        mutex_unlock(&lock);
359

    
360
        if (kill(pid, aiocb->ev_signo)) die("kill failed");
361
    }
362

    
363
    idle_threads--;
364
    cur_threads--;
365
    mutex_unlock(&lock);
366

    
367
    return NULL;
368
}
369

    
370
static void spawn_thread(void)
371
{
372
    cur_threads++;
373
    idle_threads++;
374
    thread_create(&thread_id, &attr, aio_thread, NULL);
375
}
376

    
377
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
378
{
379
    aiocb->ret = -EINPROGRESS;
380
    aiocb->active = 0;
381
    mutex_lock(&lock);
382
    if (idle_threads == 0 && cur_threads < max_threads)
383
        spawn_thread();
384
    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
385
    mutex_unlock(&lock);
386
    cond_signal(&cond);
387
}
388

    
389
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
390
{
391
    ssize_t ret;
392

    
393
    mutex_lock(&lock);
394
    ret = aiocb->ret;
395
    mutex_unlock(&lock);
396

    
397
    return ret;
398
}
399

    
400
static int qemu_paio_error(struct qemu_paiocb *aiocb)
401
{
402
    ssize_t ret = qemu_paio_return(aiocb);
403

    
404
    if (ret < 0)
405
        ret = -ret;
406
    else
407
        ret = 0;
408

    
409
    return ret;
410
}
411

    
412
static void posix_aio_read(void *opaque)
413
{
414
    PosixAioState *s = opaque;
415
    struct qemu_paiocb *acb, **pacb;
416
    int ret;
417
    ssize_t len;
418

    
419
    /* read all bytes from signal pipe */
420
    for (;;) {
421
        char bytes[16];
422

    
423
        len = read(s->rfd, bytes, sizeof(bytes));
424
        if (len == -1 && errno == EINTR)
425
            continue; /* try again */
426
        if (len == sizeof(bytes))
427
            continue; /* more to read */
428
        break;
429
    }
430

    
431
    for(;;) {
432
        pacb = &s->first_aio;
433
        for(;;) {
434
            acb = *pacb;
435
            if (!acb)
436
                goto the_end;
437
            ret = qemu_paio_error(acb);
438
            if (ret == ECANCELED) {
439
                /* remove the request */
440
                *pacb = acb->next;
441
                qemu_aio_release(acb);
442
            } else if (ret != EINPROGRESS) {
443
                /* end of aio */
444
                if (ret == 0) {
445
                    ret = qemu_paio_return(acb);
446
                    if (ret == acb->aio_nbytes)
447
                        ret = 0;
448
                    else
449
                        ret = -EINVAL;
450
                } else {
451
                    ret = -ret;
452
                }
453
                /* remove the request */
454
                *pacb = acb->next;
455
                /* call the callback */
456
                acb->common.cb(acb->common.opaque, ret);
457
                qemu_aio_release(acb);
458
                break;
459
            } else {
460
                pacb = &acb->next;
461
            }
462
        }
463
    }
464
 the_end: ;
465
}
466

    
467
static int posix_aio_flush(void *opaque)
468
{
469
    PosixAioState *s = opaque;
470
    return !!s->first_aio;
471
}
472

    
473
static PosixAioState *posix_aio_state;
474

    
475
static void aio_signal_handler(int signum)
476
{
477
    if (posix_aio_state) {
478
        char byte = 0;
479

    
480
        write(posix_aio_state->wfd, &byte, sizeof(byte));
481
    }
482

    
483
    qemu_service_io();
484
}
485

    
486
static void paio_remove(struct qemu_paiocb *acb)
487
{
488
    struct qemu_paiocb **pacb;
489

    
490
    /* remove the callback from the queue */
491
    pacb = &posix_aio_state->first_aio;
492
    for(;;) {
493
        if (*pacb == NULL) {
494
            fprintf(stderr, "paio_remove: aio request not found!\n");
495
            break;
496
        } else if (*pacb == acb) {
497
            *pacb = acb->next;
498
            qemu_aio_release(acb);
499
            break;
500
        }
501
        pacb = &(*pacb)->next;
502
    }
503
}
504

    
505
static void paio_cancel(BlockDriverAIOCB *blockacb)
506
{
507
    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
508
    int active = 0;
509

    
510
    mutex_lock(&lock);
511
    if (!acb->active) {
512
        QTAILQ_REMOVE(&request_list, acb, node);
513
        acb->ret = -ECANCELED;
514
    } else if (acb->ret == -EINPROGRESS) {
515
        active = 1;
516
    }
517
    mutex_unlock(&lock);
518

    
519
    if (active) {
520
        /* fail safe: if the aio could not be canceled, we wait for
521
           it */
522
        while (qemu_paio_error(acb) == EINPROGRESS)
523
            ;
524
    }
525

    
526
    paio_remove(acb);
527
}
528

    
529
static AIOPool raw_aio_pool = {
530
    .aiocb_size         = sizeof(struct qemu_paiocb),
531
    .cancel             = paio_cancel,
532
};
533

    
534
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
535
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
536
        BlockDriverCompletionFunc *cb, void *opaque, int type)
537
{
538
    struct qemu_paiocb *acb;
539

    
540
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
541
    if (!acb)
542
        return NULL;
543
    acb->aio_type = type;
544
    acb->aio_fildes = fd;
545
    acb->ev_signo = SIGUSR2;
546
    if (qiov) {
547
        acb->aio_iov = qiov->iov;
548
        acb->aio_niov = qiov->niov;
549
    }
550
    acb->aio_nbytes = nb_sectors * 512;
551
    acb->aio_offset = sector_num * 512;
552

    
553
    acb->next = posix_aio_state->first_aio;
554
    posix_aio_state->first_aio = acb;
555

    
556
    qemu_paio_submit(acb);
557
    return &acb->common;
558
}
559

    
560
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
561
        unsigned long int req, void *buf,
562
        BlockDriverCompletionFunc *cb, void *opaque)
563
{
564
    struct qemu_paiocb *acb;
565

    
566
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
567
    if (!acb)
568
        return NULL;
569
    acb->aio_type = QEMU_AIO_IOCTL;
570
    acb->aio_fildes = fd;
571
    acb->ev_signo = SIGUSR2;
572
    acb->aio_offset = 0;
573
    acb->aio_ioctl_buf = buf;
574
    acb->aio_ioctl_cmd = req;
575

    
576
    acb->next = posix_aio_state->first_aio;
577
    posix_aio_state->first_aio = acb;
578

    
579
    qemu_paio_submit(acb);
580
    return &acb->common;
581
}
582

    
583
void *paio_init(void)
584
{
585
    struct sigaction act;
586
    PosixAioState *s;
587
    int fds[2];
588
    int ret;
589

    
590
    if (posix_aio_state)
591
        return posix_aio_state;
592

    
593
    s = qemu_malloc(sizeof(PosixAioState));
594

    
595
    sigfillset(&act.sa_mask);
596
    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
597
    act.sa_handler = aio_signal_handler;
598
    sigaction(SIGUSR2, &act, NULL);
599

    
600
    s->first_aio = NULL;
601
    if (pipe(fds) == -1) {
602
        fprintf(stderr, "failed to create pipe\n");
603
        return NULL;
604
    }
605

    
606
    s->rfd = fds[0];
607
    s->wfd = fds[1];
608

    
609
    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
610
    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
611

    
612
    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush, s);
613

    
614
    ret = pthread_attr_init(&attr);
615
    if (ret)
616
        die2(ret, "pthread_attr_init");
617

    
618
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
619
    if (ret)
620
        die2(ret, "pthread_attr_setdetachstate");
621

    
622
    QTAILQ_INIT(&request_list);
623

    
624
    posix_aio_state = s;
625

    
626
    return posix_aio_state;
627
}