Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ 8febfa26

History | View | Annotate | Download (15.1 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <sys/ioctl.h>
15
#include <sys/types.h>
16
#include <pthread.h>
17
#include <unistd.h>
18
#include <errno.h>
19
#include <time.h>
20
#include <signal.h>
21
#include <string.h>
22
#include <stdlib.h>
23
#include <stdio.h>
24

    
25
#include "qemu-queue.h"
26
#include "osdep.h"
27
#include "qemu-common.h"
28
#include "block_int.h"
29

    
30
#include "block/raw-posix-aio.h"
31

    
32

    
33
struct qemu_paiocb {
34
    BlockDriverAIOCB common;
35
    int aio_fildes;
36
    union {
37
        struct iovec *aio_iov;
38
        void *aio_ioctl_buf;
39
    };
40
    int aio_niov;
41
    size_t aio_nbytes;
42
#define aio_ioctl_cmd   aio_nbytes /* for QEMU_AIO_IOCTL */
43
    int ev_signo;
44
    off_t aio_offset;
45

    
46
    QTAILQ_ENTRY(qemu_paiocb) node;
47
    int aio_type;
48
    ssize_t ret;
49
    int active;
50
    struct qemu_paiocb *next;
51
};
52

    
53
typedef struct PosixAioState {
54
    int rfd, wfd;
55
    struct qemu_paiocb *first_aio;
56
} PosixAioState;
57

    
58

    
59
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
60
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
61
static pthread_t thread_id;
62
static pthread_attr_t attr;
63
static int max_threads = 64;
64
static int cur_threads = 0;
65
static int idle_threads = 0;
66
static QTAILQ_HEAD(, qemu_paiocb) request_list;
67

    
68
#ifdef CONFIG_PREADV
69
static int preadv_present = 1;
70
#else
71
static int preadv_present = 0;
72
#endif
73

    
74
static void die2(int err, const char *what)
75
{
76
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
77
    abort();
78
}
79

    
80
static void die(const char *what)
81
{
82
    die2(errno, what);
83
}
84

    
85
static void mutex_lock(pthread_mutex_t *mutex)
86
{
87
    int ret = pthread_mutex_lock(mutex);
88
    if (ret) die2(ret, "pthread_mutex_lock");
89
}
90

    
91
static void mutex_unlock(pthread_mutex_t *mutex)
92
{
93
    int ret = pthread_mutex_unlock(mutex);
94
    if (ret) die2(ret, "pthread_mutex_unlock");
95
}
96

    
97
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
98
                           struct timespec *ts)
99
{
100
    int ret = pthread_cond_timedwait(cond, mutex, ts);
101
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
102
    return ret;
103
}
104

    
105
static void cond_signal(pthread_cond_t *cond)
106
{
107
    int ret = pthread_cond_signal(cond);
108
    if (ret) die2(ret, "pthread_cond_signal");
109
}
110

    
111
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
112
                          void *(*start_routine)(void*), void *arg)
113
{
114
    int ret = pthread_create(thread, attr, start_routine, arg);
115
    if (ret) die2(ret, "pthread_create");
116
}
117

    
118
static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
119
{
120
        int ret;
121

    
122
        ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
123
        if (ret == -1)
124
                return -errno;
125

    
126
        /*
127
         * This looks weird, but the aio code only consideres a request
128
         * successfull if it has written the number full number of bytes.
129
         *
130
         * Now we overload aio_nbytes as aio_ioctl_cmd for the ioctl command,
131
         * so in fact we return the ioctl command here to make posix_aio_read()
132
         * happy..
133
         */
134
        return aiocb->aio_nbytes;
135
}
136

    
137
static size_t handle_aiocb_flush(struct qemu_paiocb *aiocb)
138
{
139
    int ret;
140

    
141
    ret = qemu_fdatasync(aiocb->aio_fildes);
142
    if (ret == -1)
143
        return -errno;
144
    return 0;
145
}
146

    
147
#ifdef CONFIG_PREADV
148

    
149
static ssize_t
150
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
151
{
152
    return preadv(fd, iov, nr_iov, offset);
153
}
154

    
155
static ssize_t
156
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
157
{
158
    return pwritev(fd, iov, nr_iov, offset);
159
}
160

    
161
#else
162

    
163
static ssize_t
164
qemu_preadv(int fd, const struct iovec *iov, int nr_iov, off_t offset)
165
{
166
    return -ENOSYS;
167
}
168

    
169
static ssize_t
170
qemu_pwritev(int fd, const struct iovec *iov, int nr_iov, off_t offset)
171
{
172
    return -ENOSYS;
173
}
174

    
175
#endif
176

    
177
static size_t handle_aiocb_rw_vector(struct qemu_paiocb *aiocb)
178
{
179
    size_t offset = 0;
180
    ssize_t len;
181

    
182
    do {
183
        if (aiocb->aio_type & QEMU_AIO_WRITE)
184
            len = qemu_pwritev(aiocb->aio_fildes,
185
                               aiocb->aio_iov,
186
                               aiocb->aio_niov,
187
                               aiocb->aio_offset + offset);
188
         else
189
            len = qemu_preadv(aiocb->aio_fildes,
190
                              aiocb->aio_iov,
191
                              aiocb->aio_niov,
192
                              aiocb->aio_offset + offset);
193
    } while (len == -1 && errno == EINTR);
194

    
195
    if (len == -1)
196
        return -errno;
197
    return len;
198
}
199

    
200
static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
201
{
202
    size_t offset = 0;
203
    size_t len;
204

    
205
    while (offset < aiocb->aio_nbytes) {
206
         if (aiocb->aio_type & QEMU_AIO_WRITE)
207
             len = pwrite(aiocb->aio_fildes,
208
                          (const char *)buf + offset,
209
                          aiocb->aio_nbytes - offset,
210
                          aiocb->aio_offset + offset);
211
         else
212
             len = pread(aiocb->aio_fildes,
213
                         buf + offset,
214
                         aiocb->aio_nbytes - offset,
215
                         aiocb->aio_offset + offset);
216

    
217
         if (len == -1 && errno == EINTR)
218
             continue;
219
         else if (len == -1) {
220
             offset = -errno;
221
             break;
222
         } else if (len == 0)
223
             break;
224

    
225
         offset += len;
226
    }
227

    
228
    return offset;
229
}
230

    
231
static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
232
{
233
    size_t nbytes;
234
    char *buf;
235

    
236
    if (!(aiocb->aio_type & QEMU_AIO_MISALIGNED)) {
237
        /*
238
         * If there is just a single buffer, and it is properly aligned
239
         * we can just use plain pread/pwrite without any problems.
240
         */
241
        if (aiocb->aio_niov == 1)
242
             return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
243

    
244
        /*
245
         * We have more than one iovec, and all are properly aligned.
246
         *
247
         * Try preadv/pwritev first and fall back to linearizing the
248
         * buffer if it's not supported.
249
         */
250
        if (preadv_present) {
251
            nbytes = handle_aiocb_rw_vector(aiocb);
252
            if (nbytes == aiocb->aio_nbytes)
253
                return nbytes;
254
            if (nbytes < 0 && nbytes != -ENOSYS)
255
                return nbytes;
256
            preadv_present = 0;
257
        }
258

    
259
        /*
260
         * XXX(hch): short read/write.  no easy way to handle the reminder
261
         * using these interfaces.  For now retry using plain
262
         * pread/pwrite?
263
         */
264
    }
265

    
266
    /*
267
     * Ok, we have to do it the hard way, copy all segments into
268
     * a single aligned buffer.
269
     */
270
    buf = qemu_memalign(512, aiocb->aio_nbytes);
271
    if (aiocb->aio_type & QEMU_AIO_WRITE) {
272
        char *p = buf;
273
        int i;
274

    
275
        for (i = 0; i < aiocb->aio_niov; ++i) {
276
            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
277
            p += aiocb->aio_iov[i].iov_len;
278
        }
279
    }
280

    
281
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
282
    if (!(aiocb->aio_type & QEMU_AIO_WRITE)) {
283
        char *p = buf;
284
        size_t count = aiocb->aio_nbytes, copy;
285
        int i;
286

    
287
        for (i = 0; i < aiocb->aio_niov && count; ++i) {
288
            copy = count;
289
            if (copy > aiocb->aio_iov[i].iov_len)
290
                copy = aiocb->aio_iov[i].iov_len;
291
            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
292
            p     += copy;
293
            count -= copy;
294
        }
295
    }
296
    qemu_vfree(buf);
297

    
298
    return nbytes;
299
}
300

    
301
static void *aio_thread(void *unused)
302
{
303
    pid_t pid;
304

    
305
    pid = getpid();
306

    
307
    while (1) {
308
        struct qemu_paiocb *aiocb;
309
        size_t ret = 0;
310
        qemu_timeval tv;
311
        struct timespec ts;
312

    
313
        qemu_gettimeofday(&tv);
314
        ts.tv_sec = tv.tv_sec + 10;
315
        ts.tv_nsec = 0;
316

    
317
        mutex_lock(&lock);
318

    
319
        while (QTAILQ_EMPTY(&request_list) &&
320
               !(ret == ETIMEDOUT)) {
321
            ret = cond_timedwait(&cond, &lock, &ts);
322
        }
323

    
324
        if (QTAILQ_EMPTY(&request_list))
325
            break;
326

    
327
        aiocb = QTAILQ_FIRST(&request_list);
328
        QTAILQ_REMOVE(&request_list, aiocb, node);
329
        aiocb->active = 1;
330
        idle_threads--;
331
        mutex_unlock(&lock);
332

    
333
        switch (aiocb->aio_type & QEMU_AIO_TYPE_MASK) {
334
        case QEMU_AIO_READ:
335
        case QEMU_AIO_WRITE:
336
                ret = handle_aiocb_rw(aiocb);
337
                break;
338
        case QEMU_AIO_FLUSH:
339
                ret = handle_aiocb_flush(aiocb);
340
                break;
341
        case QEMU_AIO_IOCTL:
342
                ret = handle_aiocb_ioctl(aiocb);
343
                break;
344
        default:
345
                fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
346
                ret = -EINVAL;
347
                break;
348
        }
349

    
350
        mutex_lock(&lock);
351
        aiocb->ret = ret;
352
        idle_threads++;
353
        mutex_unlock(&lock);
354

    
355
        if (kill(pid, aiocb->ev_signo)) die("kill failed");
356
    }
357

    
358
    idle_threads--;
359
    cur_threads--;
360
    mutex_unlock(&lock);
361

    
362
    return NULL;
363
}
364

    
365
static void spawn_thread(void)
366
{
367
    sigset_t set, oldset;
368

    
369
    cur_threads++;
370
    idle_threads++;
371

    
372
    /* block all signals */
373
    if (sigfillset(&set)) die("sigfillset");
374
    if (sigprocmask(SIG_SETMASK, &set, &oldset)) die("sigprocmask");
375

    
376
    thread_create(&thread_id, &attr, aio_thread, NULL);
377

    
378
    if (sigprocmask(SIG_SETMASK, &oldset, NULL)) die("sigprocmask restore");
379
}
380

    
381
static void qemu_paio_submit(struct qemu_paiocb *aiocb)
382
{
383
    aiocb->ret = -EINPROGRESS;
384
    aiocb->active = 0;
385
    mutex_lock(&lock);
386
    if (idle_threads == 0 && cur_threads < max_threads)
387
        spawn_thread();
388
    QTAILQ_INSERT_TAIL(&request_list, aiocb, node);
389
    mutex_unlock(&lock);
390
    cond_signal(&cond);
391
}
392

    
393
static ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
394
{
395
    ssize_t ret;
396

    
397
    mutex_lock(&lock);
398
    ret = aiocb->ret;
399
    mutex_unlock(&lock);
400

    
401
    return ret;
402
}
403

    
404
static int qemu_paio_error(struct qemu_paiocb *aiocb)
405
{
406
    ssize_t ret = qemu_paio_return(aiocb);
407

    
408
    if (ret < 0)
409
        ret = -ret;
410
    else
411
        ret = 0;
412

    
413
    return ret;
414
}
415

    
416
static int posix_aio_process_queue(void *opaque)
417
{
418
    PosixAioState *s = opaque;
419
    struct qemu_paiocb *acb, **pacb;
420
    int ret;
421
    int result = 0;
422

    
423
    for(;;) {
424
        pacb = &s->first_aio;
425
        for(;;) {
426
            acb = *pacb;
427
            if (!acb)
428
                return result;
429
            ret = qemu_paio_error(acb);
430
            if (ret == ECANCELED) {
431
                /* remove the request */
432
                *pacb = acb->next;
433
                qemu_aio_release(acb);
434
                result = 1;
435
            } else if (ret != EINPROGRESS) {
436
                /* end of aio */
437
                if (ret == 0) {
438
                    ret = qemu_paio_return(acb);
439
                    if (ret == acb->aio_nbytes)
440
                        ret = 0;
441
                    else
442
                        ret = -EINVAL;
443
                } else {
444
                    ret = -ret;
445
                }
446
                /* remove the request */
447
                *pacb = acb->next;
448
                /* call the callback */
449
                acb->common.cb(acb->common.opaque, ret);
450
                qemu_aio_release(acb);
451
                result = 1;
452
                break;
453
            } else {
454
                pacb = &acb->next;
455
            }
456
        }
457
    }
458

    
459
    return result;
460
}
461

    
462
static void posix_aio_read(void *opaque)
463
{
464
    PosixAioState *s = opaque;
465
    ssize_t len;
466

    
467
    /* read all bytes from signal pipe */
468
    for (;;) {
469
        char bytes[16];
470

    
471
        len = read(s->rfd, bytes, sizeof(bytes));
472
        if (len == -1 && errno == EINTR)
473
            continue; /* try again */
474
        if (len == sizeof(bytes))
475
            continue; /* more to read */
476
        break;
477
    }
478

    
479
    posix_aio_process_queue(s);
480
}
481

    
482
static int posix_aio_flush(void *opaque)
483
{
484
    PosixAioState *s = opaque;
485
    return !!s->first_aio;
486
}
487

    
488
static PosixAioState *posix_aio_state;
489

    
490
static void aio_signal_handler(int signum)
491
{
492
    if (posix_aio_state) {
493
        char byte = 0;
494

    
495
        write(posix_aio_state->wfd, &byte, sizeof(byte));
496
    }
497

    
498
    qemu_service_io();
499
}
500

    
501
static void paio_remove(struct qemu_paiocb *acb)
502
{
503
    struct qemu_paiocb **pacb;
504

    
505
    /* remove the callback from the queue */
506
    pacb = &posix_aio_state->first_aio;
507
    for(;;) {
508
        if (*pacb == NULL) {
509
            fprintf(stderr, "paio_remove: aio request not found!\n");
510
            break;
511
        } else if (*pacb == acb) {
512
            *pacb = acb->next;
513
            qemu_aio_release(acb);
514
            break;
515
        }
516
        pacb = &(*pacb)->next;
517
    }
518
}
519

    
520
static void paio_cancel(BlockDriverAIOCB *blockacb)
521
{
522
    struct qemu_paiocb *acb = (struct qemu_paiocb *)blockacb;
523
    int active = 0;
524

    
525
    mutex_lock(&lock);
526
    if (!acb->active) {
527
        QTAILQ_REMOVE(&request_list, acb, node);
528
        acb->ret = -ECANCELED;
529
    } else if (acb->ret == -EINPROGRESS) {
530
        active = 1;
531
    }
532
    mutex_unlock(&lock);
533

    
534
    if (active) {
535
        /* fail safe: if the aio could not be canceled, we wait for
536
           it */
537
        while (qemu_paio_error(acb) == EINPROGRESS)
538
            ;
539
    }
540

    
541
    paio_remove(acb);
542
}
543

    
544
static AIOPool raw_aio_pool = {
545
    .aiocb_size         = sizeof(struct qemu_paiocb),
546
    .cancel             = paio_cancel,
547
};
548

    
549
BlockDriverAIOCB *paio_submit(BlockDriverState *bs, void *aio_ctx, int fd,
550
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
551
        BlockDriverCompletionFunc *cb, void *opaque, int type)
552
{
553
    struct qemu_paiocb *acb;
554

    
555
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
556
    if (!acb)
557
        return NULL;
558
    acb->aio_type = type;
559
    acb->aio_fildes = fd;
560
    acb->ev_signo = SIGUSR2;
561
    if (qiov) {
562
        acb->aio_iov = qiov->iov;
563
        acb->aio_niov = qiov->niov;
564
    }
565
    acb->aio_nbytes = nb_sectors * 512;
566
    acb->aio_offset = sector_num * 512;
567

    
568
    acb->next = posix_aio_state->first_aio;
569
    posix_aio_state->first_aio = acb;
570

    
571
    qemu_paio_submit(acb);
572
    return &acb->common;
573
}
574

    
575
BlockDriverAIOCB *paio_ioctl(BlockDriverState *bs, int fd,
576
        unsigned long int req, void *buf,
577
        BlockDriverCompletionFunc *cb, void *opaque)
578
{
579
    struct qemu_paiocb *acb;
580

    
581
    acb = qemu_aio_get(&raw_aio_pool, bs, cb, opaque);
582
    if (!acb)
583
        return NULL;
584
    acb->aio_type = QEMU_AIO_IOCTL;
585
    acb->aio_fildes = fd;
586
    acb->ev_signo = SIGUSR2;
587
    acb->aio_offset = 0;
588
    acb->aio_ioctl_buf = buf;
589
    acb->aio_ioctl_cmd = req;
590

    
591
    acb->next = posix_aio_state->first_aio;
592
    posix_aio_state->first_aio = acb;
593

    
594
    qemu_paio_submit(acb);
595
    return &acb->common;
596
}
597

    
598
void *paio_init(void)
599
{
600
    struct sigaction act;
601
    PosixAioState *s;
602
    int fds[2];
603
    int ret;
604

    
605
    if (posix_aio_state)
606
        return posix_aio_state;
607

    
608
    s = qemu_malloc(sizeof(PosixAioState));
609

    
610
    sigfillset(&act.sa_mask);
611
    act.sa_flags = 0; /* do not restart syscalls to interrupt select() */
612
    act.sa_handler = aio_signal_handler;
613
    sigaction(SIGUSR2, &act, NULL);
614

    
615
    s->first_aio = NULL;
616
    if (pipe(fds) == -1) {
617
        fprintf(stderr, "failed to create pipe\n");
618
        return NULL;
619
    }
620

    
621
    s->rfd = fds[0];
622
    s->wfd = fds[1];
623

    
624
    fcntl(s->rfd, F_SETFL, O_NONBLOCK);
625
    fcntl(s->wfd, F_SETFL, O_NONBLOCK);
626

    
627
    qemu_aio_set_fd_handler(s->rfd, posix_aio_read, NULL, posix_aio_flush,
628
        posix_aio_process_queue, s);
629

    
630
    ret = pthread_attr_init(&attr);
631
    if (ret)
632
        die2(ret, "pthread_attr_init");
633

    
634
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
635
    if (ret)
636
        die2(ret, "pthread_attr_setdetachstate");
637

    
638
    QTAILQ_INIT(&request_list);
639

    
640
    posix_aio_state = s;
641

    
642
    return posix_aio_state;
643
}