Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ b42ec42d

History | View | Annotate | Download (5.5 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <pthread.h>
15
#include <unistd.h>
16
#include <errno.h>
17
#include <time.h>
18
#include <string.h>
19
#include <stdlib.h>
20
#include <stdio.h>
21
#include "osdep.h"
22

    
23
#include "posix-aio-compat.h"
24

    
25
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
26
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
27
static pthread_t thread_id;
28
static pthread_attr_t attr;
29
static int max_threads = 64;
30
static int cur_threads = 0;
31
static int idle_threads = 0;
32
static TAILQ_HEAD(, qemu_paiocb) request_list;
33

    
34
static void die2(int err, const char *what)
35
{
36
    fprintf(stderr, "%s failed: %s\n", what, strerror(err));
37
    abort();
38
}
39

    
40
static void die(const char *what)
41
{
42
    die2(errno, what);
43
}
44

    
45
static void mutex_lock(pthread_mutex_t *mutex)
46
{
47
    int ret = pthread_mutex_lock(mutex);
48
    if (ret) die2(ret, "pthread_mutex_lock");
49
}
50

    
51
static void mutex_unlock(pthread_mutex_t *mutex)
52
{
53
    int ret = pthread_mutex_unlock(mutex);
54
    if (ret) die2(ret, "pthread_mutex_unlock");
55
}
56

    
57
static int cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex,
58
                           struct timespec *ts)
59
{
60
    int ret = pthread_cond_timedwait(cond, mutex, ts);
61
    if (ret && ret != ETIMEDOUT) die2(ret, "pthread_cond_timedwait");
62
    return ret;
63
}
64

    
65
static void cond_signal(pthread_cond_t *cond)
66
{
67
    int ret = pthread_cond_signal(cond);
68
    if (ret) die2(ret, "pthread_cond_signal");
69
}
70

    
71
static void thread_create(pthread_t *thread, pthread_attr_t *attr,
72
                          void *(*start_routine)(void*), void *arg)
73
{
74
    int ret = pthread_create(thread, attr, start_routine, arg);
75
    if (ret) die2(ret, "pthread_create");
76
}
77

    
78
static void *aio_thread(void *unused)
79
{
80
    pid_t pid;
81
    sigset_t set;
82

    
83
    pid = getpid();
84

    
85
    /* block all signals */
86
    if (sigfillset(&set)) die("sigfillset");
87
    if (sigprocmask(SIG_BLOCK, &set, NULL)) die("sigprocmask");
88

    
89
    while (1) {
90
        struct qemu_paiocb *aiocb;
91
        size_t offset;
92
        int ret = 0;
93
        qemu_timeval tv;
94
        struct timespec ts;
95

    
96
        qemu_gettimeofday(&tv);
97
        ts.tv_sec = tv.tv_sec + 10;
98
        ts.tv_nsec = 0;
99

    
100
        mutex_lock(&lock);
101

    
102
        while (TAILQ_EMPTY(&request_list) &&
103
               !(ret == ETIMEDOUT)) {
104
            ret = cond_timedwait(&cond, &lock, &ts);
105
        }
106

    
107
        if (TAILQ_EMPTY(&request_list))
108
            break;
109

    
110
        aiocb = TAILQ_FIRST(&request_list);
111
        TAILQ_REMOVE(&request_list, aiocb, node);
112

    
113
        offset = 0;
114
        aiocb->active = 1;
115

    
116
        idle_threads--;
117
        mutex_unlock(&lock);
118

    
119
        while (offset < aiocb->aio_nbytes) {
120
            ssize_t len;
121

    
122
            if (aiocb->is_write)
123
                len = pwrite(aiocb->aio_fildes,
124
                             (const char *)aiocb->aio_buf + offset,
125
                             aiocb->aio_nbytes - offset,
126
                             aiocb->aio_offset + offset);
127
            else
128
                len = pread(aiocb->aio_fildes,
129
                            (char *)aiocb->aio_buf + offset,
130
                            aiocb->aio_nbytes - offset,
131
                            aiocb->aio_offset + offset);
132

    
133
            if (len == -1 && errno == EINTR)
134
                continue;
135
            else if (len == -1) {
136
                offset = -errno;
137
                break;
138
            } else if (len == 0)
139
                break;
140

    
141
            offset += len;
142
        }
143

    
144
        mutex_lock(&lock);
145
        aiocb->ret = offset;
146
        idle_threads++;
147
        mutex_unlock(&lock);
148

    
149
        if (kill(pid, aiocb->ev_signo)) die("kill failed");
150
    }
151

    
152
    idle_threads--;
153
    cur_threads--;
154
    mutex_unlock(&lock);
155

    
156
    return NULL;
157
}
158

    
159
static void spawn_thread(void)
160
{
161
    cur_threads++;
162
    idle_threads++;
163
    thread_create(&thread_id, &attr, aio_thread, NULL);
164
}
165

    
166
int qemu_paio_init(struct qemu_paioinit *aioinit)
167
{
168
    int ret;
169

    
170
    ret = pthread_attr_init(&attr);
171
    if (ret) die2(ret, "pthread_attr_init");
172

    
173
    ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
174
    if (ret) die2(ret, "pthread_attr_setdetachstate");
175

    
176
    TAILQ_INIT(&request_list);
177

    
178
    return 0;
179
}
180

    
181
static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
182
{
183
    aiocb->is_write = is_write;
184
    aiocb->ret = -EINPROGRESS;
185
    aiocb->active = 0;
186
    mutex_lock(&lock);
187
    if (idle_threads == 0 && cur_threads < max_threads)
188
        spawn_thread();
189
    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
190
    mutex_unlock(&lock);
191
    cond_signal(&cond);
192

    
193
    return 0;
194
}
195

    
196
int qemu_paio_read(struct qemu_paiocb *aiocb)
197
{
198
    return qemu_paio_submit(aiocb, 0);
199
}
200

    
201
int qemu_paio_write(struct qemu_paiocb *aiocb)
202
{
203
    return qemu_paio_submit(aiocb, 1);
204
}
205

    
206
ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
207
{
208
    ssize_t ret;
209

    
210
    mutex_lock(&lock);
211
    ret = aiocb->ret;
212
    mutex_unlock(&lock);
213

    
214
    return ret;
215
}
216

    
217
int qemu_paio_error(struct qemu_paiocb *aiocb)
218
{
219
    ssize_t ret = qemu_paio_return(aiocb);
220

    
221
    if (ret < 0)
222
        ret = -ret;
223
    else
224
        ret = 0;
225

    
226
    return ret;
227
}
228

    
229
int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
230
{
231
    int ret;
232

    
233
    mutex_lock(&lock);
234
    if (!aiocb->active) {
235
        TAILQ_REMOVE(&request_list, aiocb, node);
236
        aiocb->ret = -ECANCELED;
237
        ret = QEMU_PAIO_CANCELED;
238
    } else if (aiocb->ret == -EINPROGRESS)
239
        ret = QEMU_PAIO_NOTCANCELED;
240
    else
241
        ret = QEMU_PAIO_ALLDONE;
242
    mutex_unlock(&lock);
243

    
244
    return ret;
245
}