Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ 93fcfe39

History | View | Annotate | Download (4.4 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <pthread.h>
15
#include <unistd.h>
16
#include <errno.h>
17
#include <sys/time.h>
18
#include "osdep.h"
19

    
20
#include "posix-aio-compat.h"
21

    
22
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
23
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
24
static pthread_t thread_id;
25
static int max_threads = 64;
26
static int cur_threads = 0;
27
static int idle_threads = 0;
28
static TAILQ_HEAD(, qemu_paiocb) request_list;
29

    
30
static void *aio_thread(void *unused)
31
{
32
    sigset_t set;
33

    
34
    /* block all signals */
35
    sigfillset(&set);
36
    sigprocmask(SIG_BLOCK, &set, NULL);
37

    
38
    while (1) {
39
        struct qemu_paiocb *aiocb;
40
        size_t offset;
41
        int ret = 0;
42

    
43
        pthread_mutex_lock(&lock);
44

    
45
        while (TAILQ_EMPTY(&request_list) &&
46
               !(ret == ETIMEDOUT)) {
47
            struct timespec ts = { 0 };
48
            qemu_timeval tv;
49

    
50
            qemu_gettimeofday(&tv);
51
            ts.tv_sec = tv.tv_sec + 10;
52
            ret = pthread_cond_timedwait(&cond, &lock, &ts);
53
        }
54

    
55
        if (ret == ETIMEDOUT)
56
            break;
57

    
58
        aiocb = TAILQ_FIRST(&request_list);
59
        TAILQ_REMOVE(&request_list, aiocb, node);
60

    
61
        offset = 0;
62
        aiocb->active = 1;
63

    
64
        idle_threads--;
65
        pthread_mutex_unlock(&lock);
66

    
67
        while (offset < aiocb->aio_nbytes) {
68
            ssize_t len;
69

    
70
            if (aiocb->is_write)
71
                len = pwrite(aiocb->aio_fildes,
72
                             (const char *)aiocb->aio_buf + offset,
73
                             aiocb->aio_nbytes - offset,
74
                             aiocb->aio_offset + offset);
75
            else
76
                len = pread(aiocb->aio_fildes,
77
                            (char *)aiocb->aio_buf + offset,
78
                            aiocb->aio_nbytes - offset,
79
                            aiocb->aio_offset + offset);
80

    
81
            if (len == -1 && errno == EINTR)
82
                continue;
83
            else if (len == -1) {
84
                offset = -errno;
85
                break;
86
            } else if (len == 0)
87
                break;
88

    
89
            offset += len;
90
        }
91

    
92
        pthread_mutex_lock(&lock);
93
        aiocb->ret = offset;
94
        idle_threads++;
95
        pthread_mutex_unlock(&lock);
96

    
97
        sigqueue(getpid(),
98
                 aiocb->aio_sigevent.sigev_signo,
99
                 aiocb->aio_sigevent.sigev_value);
100
    }
101

    
102
    idle_threads--;
103
    cur_threads--;
104
    pthread_mutex_unlock(&lock);
105

    
106
    return NULL;
107
}
108

    
109
static int spawn_thread(void)
110
{
111
    pthread_attr_t attr;
112
    int ret;
113

    
114
    cur_threads++;
115
    idle_threads++;
116

    
117
    pthread_attr_init(&attr);
118
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
119
    ret = pthread_create(&thread_id, &attr, aio_thread, NULL);
120
    pthread_attr_destroy(&attr);
121

    
122
    return ret;
123
}
124

    
125
int qemu_paio_init(struct qemu_paioinit *aioinit)
126
{
127
    TAILQ_INIT(&request_list);
128

    
129
    return 0;
130
}
131

    
132
static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
133
{
134
    aiocb->is_write = is_write;
135
    aiocb->ret = -EINPROGRESS;
136
    aiocb->active = 0;
137
    pthread_mutex_lock(&lock);
138
    if (idle_threads == 0 && cur_threads < max_threads)
139
        spawn_thread();
140
    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
141
    pthread_mutex_unlock(&lock);
142
    pthread_cond_broadcast(&cond);
143

    
144
    return 0;
145
}
146

    
147
int qemu_paio_read(struct qemu_paiocb *aiocb)
148
{
149
    return qemu_paio_submit(aiocb, 0);
150
}
151

    
152
int qemu_paio_write(struct qemu_paiocb *aiocb)
153
{
154
    return qemu_paio_submit(aiocb, 1);
155
}
156

    
157
ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
158
{
159
    ssize_t ret;
160

    
161
    pthread_mutex_lock(&lock);
162
    ret = aiocb->ret;
163
    pthread_mutex_unlock(&lock);
164

    
165
    return ret;
166
}
167

    
168
int qemu_paio_error(struct qemu_paiocb *aiocb)
169
{
170
    ssize_t ret = qemu_paio_return(aiocb);
171

    
172
    if (ret < 0)
173
        ret = -ret;
174
    else
175
        ret = 0;
176

    
177
    return ret;
178
}
179

    
180
int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
181
{
182
    int ret;
183

    
184
    pthread_mutex_lock(&lock);
185
    if (!aiocb->active) {
186
        TAILQ_REMOVE(&request_list, aiocb, node);
187
        aiocb->ret = -ECANCELED;
188
        ret = QEMU_PAIO_CANCELED;
189
    } else if (aiocb->ret == -EINPROGRESS)
190
        ret = QEMU_PAIO_NOTCANCELED;
191
    else
192
        ret = QEMU_PAIO_ALLDONE;
193
    pthread_mutex_unlock(&lock);
194

    
195
    return ret;
196
}