Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ f141eafe

History | View | Annotate | Download (7.9 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <sys/ioctl.h>
15
#include <pthread.h>
16
#include <unistd.h>
17
#include <errno.h>
18
#include <time.h>
19
#include <string.h>
20
#include <stdlib.h>
21
#include <stdio.h>
22
#include "osdep.h"
23
#include "qemu-common.h"
24

    
25
#include "posix-aio-compat.h"
26

    
27
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
28
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
29
static pthread_t thread_id;
30
static pthread_attr_t attr;
31
static int max_threads = 64;
32
static int cur_threads = 0;
33
static int idle_threads = 0;
34
static TAILQ_HEAD(, qemu_paiocb) request_list;
35

    
36
static void die2(int err, const char *what)
37
{
38
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
39
    abort();
40
}
41

    
42
static void die(const char *what)
43
{
44
    die2(errno, what);
45
}
46

    
47
static void mutex_lock(pthread_mutex_t *mutex)
48
{
49
    int ret = pthread_mutex_lock(mutex);
50
    if (ret) die2(ret, "pthread_mutex_lock");
51
}
52

    
53
static void mutex_unlock(pthread_mutex_t *mutex)
54
{
55
    int ret = pthread_mutex_unlock(mutex);
56
    if (ret) die2(ret, "pthread_mutex_unlock");
57
}
58

    
59
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
60
                           struct timespec *ts)
61
{
62
    int ret = pthread_cond_timedwait(cond, mutex, ts);
63
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
64
    return ret;
65
}
66

    
67
static void cond_signal(pthread_cond_t *cond)
68
{
69
    int ret = pthread_cond_signal(cond);
70
    if (ret) die2(ret, "pthread_cond_signal");
71
}
72

    
73
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
74
                          void *(*start_routine)(void*), void *arg)
75
{
76
    int ret = pthread_create(thread, attr, start_routine, arg);
77
    if (ret) die2(ret, "pthread_create");
78
}
79

    
80
static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
81
{
82
        int ret;
83

    
84
        ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_ioctl_buf);
85
        if (ret == -1)
86
                return -errno;
87
        return ret;
88
}
89

    
90
/*
91
 * Check if we need to copy the data in the aiocb into a new
92
 * properly aligned buffer.
93
 */
94
static int aiocb_needs_copy(struct qemu_paiocb *aiocb)
95
{
96
    if (aiocb->aio_flags & QEMU_AIO_SECTOR_ALIGNED) {
97
        int i;
98

    
99
        for (i = 0; i < aiocb->aio_niov; i++)
100
            if ((uintptr_t) aiocb->aio_iov[i].iov_base % 512)
101
                return 1;
102
    }
103

    
104
    return 0;
105
}
106

    
107
static size_t handle_aiocb_rw_linear(struct qemu_paiocb *aiocb, char *buf)
108
{
109
    size_t offset = 0;
110
    size_t len;
111

    
112
    while (offset < aiocb->aio_nbytes) {
113
         if (aiocb->aio_type == QEMU_PAIO_WRITE)
114
             len = pwrite(aiocb->aio_fildes,
115
                          (const char *)buf + offset,
116
                          aiocb->aio_nbytes - offset,
117
                          aiocb->aio_offset + offset);
118
         else
119
             len = pread(aiocb->aio_fildes,
120
                         buf + offset,
121
                         aiocb->aio_nbytes - offset,
122
                         aiocb->aio_offset + offset);
123

    
124
         if (len == -1 && errno == EINTR)
125
             continue;
126
         else if (len == -1) {
127
             offset = -errno;
128
             break;
129
         } else if (len == 0)
130
             break;
131

    
132
         offset += len;
133
    }
134

    
135
    return offset;
136
}
137

    
138
static size_t handle_aiocb_rw(struct qemu_paiocb *aiocb)
139
{
140
    size_t nbytes;
141
    char *buf;
142

    
143
    if (!aiocb_needs_copy(aiocb) && aiocb->aio_niov == 1) {
144
        /*
145
         * If there is just a single buffer, and it is properly aligned
146
         * we can just use plain pread/pwrite without any problems.
147
         */
148
        return handle_aiocb_rw_linear(aiocb, aiocb->aio_iov->iov_base);
149
    }
150

    
151
    /*
152
     * Ok, we have to do it the hard way, copy all segments into
153
     * a single aligned buffer.
154
     */
155
    buf = qemu_memalign(512, aiocb->aio_nbytes);
156
    if (aiocb->aio_type == QEMU_PAIO_WRITE) {
157
        char *p = buf;
158
        int i;
159

    
160
        for (i = 0; i < aiocb->aio_niov; ++i) {
161
            memcpy(p, aiocb->aio_iov[i].iov_base, aiocb->aio_iov[i].iov_len);
162
            p += aiocb->aio_iov[i].iov_len;
163
        }
164
    }
165

    
166
    nbytes = handle_aiocb_rw_linear(aiocb, buf);
167
    if (aiocb->aio_type != QEMU_PAIO_WRITE) {
168
        char *p = buf;
169
        size_t count = aiocb->aio_nbytes, copy;
170
        int i;
171

    
172
        for (i = 0; i < aiocb->aio_niov && count; ++i) {
173
            copy = count;
174
            if (copy > aiocb->aio_iov[i].iov_len)
175
                copy = aiocb->aio_iov[i].iov_len;
176
            memcpy(aiocb->aio_iov[i].iov_base, p, copy);
177
            p     += copy;
178
            count -= copy;
179
        }
180
    }
181
    qemu_vfree(buf);
182

    
183
    return nbytes;
184
}
185

    
186
static void *aio_thread(void *unused)
187
{
188
    pid_t pid;
189
    sigset_t set;
190

    
191
    pid = getpid();
192

    
193
    /* block all signals */
194
    if (sigfillset(&set)) die("sigfillset");
195
    if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
196

    
197
    while (1) {
198
        struct qemu_paiocb *aiocb;
199
        size_t ret = 0;
200
        qemu_timeval tv;
201
        struct timespec ts;
202

    
203
        qemu_gettimeofday(&tv);
204
        ts.tv_sec = tv.tv_sec + 10;
205
        ts.tv_nsec = 0;
206

    
207
        mutex_lock(&lock);
208

    
209
        while (TAILQ_EMPTY(&request_list) &&
210
               !(ret == ETIMEDOUT)) {
211
            ret = cond_timedwait(&cond, &lock, &ts);
212
        }
213

    
214
        if (TAILQ_EMPTY(&request_list))
215
            break;
216

    
217
        aiocb = TAILQ_FIRST(&request_list);
218
        TAILQ_REMOVE(&request_list, aiocb, node);
219
        aiocb->active = 1;
220
        idle_threads--;
221
        mutex_unlock(&lock);
222

    
223
        switch (aiocb->aio_type) {
224
        case QEMU_PAIO_READ:
225
        case QEMU_PAIO_WRITE:
226
                ret = handle_aiocb_rw(aiocb);
227
                break;
228
        case QEMU_PAIO_IOCTL:
229
                ret = handle_aiocb_ioctl(aiocb);
230
                break;
231
        default:
232
                fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
233
                ret = -EINVAL;
234
                break;
235
        }
236

    
237
        mutex_lock(&lock);
238
        aiocb->ret = ret;
239
        idle_threads++;
240
        mutex_unlock(&lock);
241

    
242
        if (kill(pid, aiocb->ev_signo)) die("kill failed");
243
    }
244

    
245
    idle_threads--;
246
    cur_threads--;
247
    mutex_unlock(&lock);
248

    
249
    return NULL;
250
}
251

    
252
static void spawn_thread(void)
253
{
254
    cur_threads++;
255
    idle_threads++;
256
    thread_create(&thread_id, &attr, aio_thread, NULL);
257
}
258

    
259
int qemu_paio_init(struct qemu_paioinit *aioinit)
260
{
261
    int ret;
262

    
263
    ret = pthread_attr_init(&attr);
264
    if (ret) die2(ret, "pthread_attr_init");
265

    
266
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
267
    if (ret) die2(ret, "pthread_attr_setdetachstate");
268

    
269
    TAILQ_INIT(&request_list);
270

    
271
    return 0;
272
}
273

    
274
static int qemu_paio_submit(struct qemu_paiocb *aiocb, int type)
275
{
276
    aiocb->aio_type = type;
277
    aiocb->ret = -EINPROGRESS;
278
    aiocb->active = 0;
279
    mutex_lock(&lock);
280
    if (idle_threads == 0 && cur_threads < max_threads)
281
        spawn_thread();
282
    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
283
    mutex_unlock(&lock);
284
    cond_signal(&cond);
285

    
286
    return 0;
287
}
288

    
289
int qemu_paio_read(struct qemu_paiocb *aiocb)
290
{
291
    return qemu_paio_submit(aiocb, QEMU_PAIO_READ);
292
}
293

    
294
int qemu_paio_write(struct qemu_paiocb *aiocb)
295
{
296
    return qemu_paio_submit(aiocb, QEMU_PAIO_WRITE);
297
}
298

    
299
int qemu_paio_ioctl(struct qemu_paiocb *aiocb)
300
{
301
    return qemu_paio_submit(aiocb, QEMU_PAIO_IOCTL);
302
}
303

    
304
ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
305
{
306
    ssize_t ret;
307

    
308
    mutex_lock(&lock);
309
    ret = aiocb->ret;
310
    mutex_unlock(&lock);
311

    
312
    return ret;
313
}
314

    
315
int qemu_paio_error(struct qemu_paiocb *aiocb)
316
{
317
    ssize_t ret = qemu_paio_return(aiocb);
318

    
319
    if (ret < 0)
320
        ret = -ret;
321
    else
322
        ret = 0;
323

    
324
    return ret;
325
}
326

    
327
int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
328
{
329
    int ret;
330

    
331
    mutex_lock(&lock);
332
    if (!aiocb->active) {
333
        TAILQ_REMOVE(&request_list, aiocb, node);
334
        aiocb->ret = -ECANCELED;
335
        ret = QEMU_PAIO_CANCELED;
336
    } else if (aiocb->ret == -EINPROGRESS)
337
        ret = QEMU_PAIO_NOTCANCELED;
338
    else
339
        ret = QEMU_PAIO_ALLDONE;
340
    mutex_unlock(&lock);
341

    
342
    return ret;
343
}