Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ 221f715d

History | View | Annotate | Download (6.1 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <sys/ioctl.h>
15
#include <pthread.h>
16
#include <unistd.h>
17
#include <errno.h>
18
#include <time.h>
19
#include <string.h>
20
#include <stdlib.h>
21
#include <stdio.h>
22
#include "osdep.h"
23

    
24
#include "posix-aio-compat.h"
25

    
26
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
27
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
28
static pthread_t thread_id;
29
static pthread_attr_t attr;
30
static int max_threads = 64;
31
static int cur_threads = 0;
32
static int idle_threads = 0;
33
static TAILQ_HEAD(, qemu_paiocb) request_list;
34

    
35
static void die2(int err, const char *what)
36
{
37
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
38
    abort();
39
}
40

    
41
static void die(const char *what)
42
{
43
    die2(errno, what);
44
}
45

    
46
static void mutex_lock(pthread_mutex_t *mutex)
47
{
48
    int ret = pthread_mutex_lock(mutex);
49
    if (ret) die2(ret, "pthread_mutex_lock");
50
}
51

    
52
static void mutex_unlock(pthread_mutex_t *mutex)
53
{
54
    int ret = pthread_mutex_unlock(mutex);
55
    if (ret) die2(ret, "pthread_mutex_unlock");
56
}
57

    
58
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
59
                           struct timespec *ts)
60
{
61
    int ret = pthread_cond_timedwait(cond, mutex, ts);
62
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
63
    return ret;
64
}
65

    
66
static void cond_signal(pthread_cond_t *cond)
67
{
68
    int ret = pthread_cond_signal(cond);
69
    if (ret) die2(ret, "pthread_cond_signal");
70
}
71

    
72
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
73
                          void *(*start_routine)(void*), void *arg)
74
{
75
    int ret = pthread_create(thread, attr, start_routine, arg);
76
    if (ret) die2(ret, "pthread_create");
77
}
78

    
79
static size_t handle_aiocb_readwrite(struct qemu_paiocb *aiocb)
80
{
81
    size_t offset = 0;
82
    ssize_t len;
83

    
84
    while (offset < aiocb->aio_nbytes) {
85
        if (aiocb->aio_type == QEMU_PAIO_WRITE)
86
            len = pwrite(aiocb->aio_fildes,
87
                         (const char *)aiocb->aio_buf + offset,
88
                         aiocb->aio_nbytes - offset,
89
                         aiocb->aio_offset + offset);
90
        else
91
            len = pread(aiocb->aio_fildes,
92
                        (char *)aiocb->aio_buf + offset,
93
                        aiocb->aio_nbytes - offset,
94
                        aiocb->aio_offset + offset);
95

    
96
        if (len == -1 && errno == EINTR)
97
            continue;
98
        else if (len == -1) {
99
            offset = -errno;
100
            break;
101
        } else if (len == 0)
102
            break;
103

    
104
        offset += len;
105
    }
106

    
107
    return offset;
108
}
109

    
110
static size_t handle_aiocb_ioctl(struct qemu_paiocb *aiocb)
111
{
112
        int ret;
113

    
114
        ret = ioctl(aiocb->aio_fildes, aiocb->aio_ioctl_cmd, aiocb->aio_buf);
115
        if (ret == -1)
116
                return -errno;
117
        return ret;
118
}
119

    
120
static void *aio_thread(void *unused)
121
{
122
    pid_t pid;
123
    sigset_t set;
124

    
125
    pid = getpid();
126

    
127
    /* block all signals */
128
    if (sigfillset(&set)) die("sigfillset");
129
    if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
130

    
131
    while (1) {
132
        struct qemu_paiocb *aiocb;
133
        size_t ret = 0;
134
        qemu_timeval tv;
135
        struct timespec ts;
136

    
137
        qemu_gettimeofday(&tv);
138
        ts.tv_sec = tv.tv_sec + 10;
139
        ts.tv_nsec = 0;
140

    
141
        mutex_lock(&lock);
142

    
143
        while (TAILQ_EMPTY(&request_list) &&
144
               !(ret == ETIMEDOUT)) {
145
            ret = cond_timedwait(&cond, &lock, &ts);
146
        }
147

    
148
        if (TAILQ_EMPTY(&request_list))
149
            break;
150

    
151
        aiocb = TAILQ_FIRST(&request_list);
152
        TAILQ_REMOVE(&request_list, aiocb, node);
153
        aiocb->active = 1;
154
        idle_threads--;
155
        mutex_unlock(&lock);
156

    
157
        switch (aiocb->aio_type) {
158
        case QEMU_PAIO_READ:
159
        case QEMU_PAIO_WRITE:
160
                ret = handle_aiocb_readwrite(aiocb);
161
                break;
162
        case QEMU_PAIO_IOCTL:
163
                ret = handle_aiocb_ioctl(aiocb);
164
                break;
165
        default:
166
                fprintf(stderr, "invalid aio request (0x%x)\n", aiocb->aio_type);
167
                ret = -EINVAL;
168
                break;
169
        }
170

    
171
        mutex_lock(&lock);
172
        aiocb->ret = ret;
173
        idle_threads++;
174
        mutex_unlock(&lock);
175

    
176
        if (kill(pid, aiocb->ev_signo)) die("kill failed");
177
    }
178

    
179
    idle_threads--;
180
    cur_threads--;
181
    mutex_unlock(&lock);
182

    
183
    return NULL;
184
}
185

    
186
static void spawn_thread(void)
187
{
188
    cur_threads++;
189
    idle_threads++;
190
    thread_create(&thread_id, &attr, aio_thread, NULL);
191
}
192

    
193
int qemu_paio_init(struct qemu_paioinit *aioinit)
194
{
195
    int ret;
196

    
197
    ret = pthread_attr_init(&attr);
198
    if (ret) die2(ret, "pthread_attr_init");
199

    
200
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
201
    if (ret) die2(ret, "pthread_attr_setdetachstate");
202

    
203
    TAILQ_INIT(&request_list);
204

    
205
    return 0;
206
}
207

    
208
static int qemu_paio_submit(struct qemu_paiocb *aiocb, int type)
209
{
210
    aiocb->aio_type = type;
211
    aiocb->ret = -EINPROGRESS;
212
    aiocb->active = 0;
213
    mutex_lock(&lock);
214
    if (idle_threads == 0 && cur_threads < max_threads)
215
        spawn_thread();
216
    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
217
    mutex_unlock(&lock);
218
    cond_signal(&cond);
219

    
220
    return 0;
221
}
222

    
223
int qemu_paio_read(struct qemu_paiocb *aiocb)
224
{
225
    return qemu_paio_submit(aiocb, QEMU_PAIO_READ);
226
}
227

    
228
int qemu_paio_write(struct qemu_paiocb *aiocb)
229
{
230
    return qemu_paio_submit(aiocb, QEMU_PAIO_WRITE);
231
}
232

    
233
int qemu_paio_ioctl(struct qemu_paiocb *aiocb)
234
{
235
    return qemu_paio_submit(aiocb, QEMU_PAIO_IOCTL);
236
}
237

    
238
ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
239
{
240
    ssize_t ret;
241

    
242
    mutex_lock(&lock);
243
    ret = aiocb->ret;
244
    mutex_unlock(&lock);
245

    
246
    return ret;
247
}
248

    
249
int qemu_paio_error(struct qemu_paiocb *aiocb)
250
{
251
    ssize_t ret = qemu_paio_return(aiocb);
252

    
253
    if (ret < 0)
254
        ret = -ret;
255
    else
256
        ret = 0;
257

    
258
    return ret;
259
}
260

    
261
int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
262
{
263
    int ret;
264

    
265
    mutex_lock(&lock);
266
    if (!aiocb->active) {
267
        TAILQ_REMOVE(&request_list, aiocb, node);
268
        aiocb->ret = -ECANCELED;
269
        ret = QEMU_PAIO_CANCELED;
270
    } else if (aiocb->ret == -EINPROGRESS)
271
        ret = QEMU_PAIO_NOTCANCELED;
272
    else
273
        ret = QEMU_PAIO_ALLDONE;
274
    mutex_unlock(&lock);
275

    
276
    return ret;
277
}