Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ 1d6198c3

History | View | Annotate | Download (4.5 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <pthread.h>
15
#include <unistd.h>
16
#include <errno.h>
17
#include <sys/time.h>
18
#include "osdep.h"
19

    
20
#include "posix-aio-compat.h"
21

    
22
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
23
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
24
static pthread_t thread_id;
25
static int max_threads = 64;
26
static int cur_threads = 0;
27
static int idle_threads = 0;
28
static TAILQ_HEAD(, qemu_paiocb) request_list;
29

    
30
static void *aio_thread(void *unused)
31
{
32
    sigset_t set;
33

    
34
    /* block all signals */
35
    sigfillset(&set);
36
    sigprocmask(SIG_BLOCK, &set, NULL);
37

    
38
    while (1) {
39
        struct qemu_paiocb *aiocb;
40
        size_t offset;
41
        int ret = 0;
42

    
43
        pthread_mutex_lock(&lock);
44

    
45
        while (TAILQ_EMPTY(&request_list) &&
46
               !(ret == ETIMEDOUT)) {
47
            struct timespec ts = { 0 };
48
            qemu_timeval tv;
49

    
50
            qemu_gettimeofday(&tv);
51
            ts.tv_sec = tv.tv_sec + 10;
52
            ret = pthread_cond_timedwait(&cond, &lock, &ts);
53
        }
54

    
55
        if (ret == ETIMEDOUT)
56
            break;
57

    
58
        aiocb = TAILQ_FIRST(&request_list);
59
        TAILQ_REMOVE(&request_list, aiocb, node);
60

    
61
        offset = 0;
62
        aiocb->active = 1;
63

    
64
        idle_threads--;
65
        pthread_mutex_unlock(&lock);
66

    
67
        while (offset < aiocb->aio_nbytes) {
68
            ssize_t len;
69

    
70
            if (aiocb->is_write)
71
                len = pwrite(aiocb->aio_fildes,
72
                             (const char *)aiocb->aio_buf + offset,
73
                             aiocb->aio_nbytes - offset,
74
                             aiocb->aio_offset + offset);
75
            else
76
                len = pread(aiocb->aio_fildes,
77
                            (char *)aiocb->aio_buf + offset,
78
                            aiocb->aio_nbytes - offset,
79
                            aiocb->aio_offset + offset);
80

    
81
            if (len == -1 && errno == EINTR)
82
                continue;
83
            else if (len == -1) {
84
                pthread_mutex_lock(&lock);
85
                aiocb->ret = -errno;
86
                pthread_mutex_unlock(&lock);
87
                break;
88
            } else if (len == 0)
89
                break;
90

    
91
            offset += len;
92

    
93
            pthread_mutex_lock(&lock);
94
            aiocb->ret = offset;
95
            pthread_mutex_unlock(&lock);
96
        }
97

    
98
        pthread_mutex_lock(&lock);
99
        idle_threads++;
100
        pthread_mutex_unlock(&lock);
101

    
102
        sigqueue(getpid(),
103
                 aiocb->aio_sigevent.sigev_signo,
104
                 aiocb->aio_sigevent.sigev_value);
105
    }
106

    
107
    idle_threads--;
108
    cur_threads--;
109
    pthread_mutex_unlock(&lock);
110

    
111
    return NULL;
112
}
113

    
114
static int spawn_thread(void)
115
{
116
    pthread_attr_t attr;
117
    int ret;
118

    
119
    cur_threads++;
120
    idle_threads++;
121

    
122
    pthread_attr_init(&attr);
123
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
124
    ret = pthread_create(&thread_id, &attr, aio_thread, NULL);
125
    pthread_attr_destroy(&attr);
126

    
127
    return ret;
128
}
129

    
130
int qemu_paio_init(struct qemu_paioinit *aioinit)
131
{
132
    TAILQ_INIT(&request_list);
133

    
134
    return 0;
135
}
136

    
137
static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
138
{
139
    aiocb->is_write = is_write;
140
    aiocb->ret = -EINPROGRESS;
141
    aiocb->active = 0;
142
    pthread_mutex_lock(&lock);
143
    if (idle_threads == 0 && cur_threads < max_threads)
144
        spawn_thread();
145
    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
146
    pthread_mutex_unlock(&lock);
147
    pthread_cond_broadcast(&cond);
148

    
149
    return 0;
150
}
151

    
152
int qemu_paio_read(struct qemu_paiocb *aiocb)
153
{
154
    return qemu_paio_submit(aiocb, 0);
155
}
156

    
157
int qemu_paio_write(struct qemu_paiocb *aiocb)
158
{
159
    return qemu_paio_submit(aiocb, 1);
160
}
161

    
162
ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
163
{
164
    ssize_t ret;
165

    
166
    pthread_mutex_lock(&lock);
167
    ret = aiocb->ret;
168
    pthread_mutex_unlock(&lock);
169

    
170
    return ret;
171
}
172

    
173
int qemu_paio_error(struct qemu_paiocb *aiocb)
174
{
175
    ssize_t ret = qemu_paio_return(aiocb);
176

    
177
    if (ret < 0)
178
        ret = -ret;
179
    else
180
        ret = 0;
181

    
182
    return ret;
183
}
184

    
185
int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
186
{
187
    int ret;
188

    
189
    pthread_mutex_lock(&lock);
190
    if (!aiocb->active) {
191
        TAILQ_REMOVE(&request_list, aiocb, node);
192
        aiocb->ret = -ECANCELED;
193
        ret = QEMU_PAIO_CANCELED;
194
    } else if (aiocb->ret == -EINPROGRESS)
195
        ret = QEMU_PAIO_NOTCANCELED;
196
    else
197
        ret = QEMU_PAIO_ALLDONE;
198
    pthread_mutex_unlock(&lock);
199

    
200
    return ret;
201
}