Statistics
| Branch: | Revision:

root / posix-aio-compat.c @ 17759187

History | View | Annotate | Download (4.3 kB)

1
/*
2
 * QEMU posix-aio emulation
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include <pthread.h>
15
#include <unistd.h>
16
#include <errno.h>
17
#include <sys/time.h>
18
#include "osdep.h"
19

    
20
#include "posix-aio-compat.h"
21

    
22
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
23
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
24
static pthread_t thread_id;
25
static int max_threads = 64;
26
static int cur_threads = 0;
27
static int idle_threads = 0;
28
static TAILQ_HEAD(, qemu_paiocb) request_list;
29

    
30
static void *aio_thread(void *unused)
31
{
32
    sigset_t set;
33

    
34
    /* block all signals */
35
    sigfillset(&set);
36
    sigprocmask(SIG_BLOCK, &set, NULL);
37

    
38
    while (1) {
39
        struct qemu_paiocb *aiocb;
40
        size_t offset;
41
        int ret = 0;
42

    
43
        pthread_mutex_lock(&lock);
44

    
45
        while (TAILQ_EMPTY(&request_list) &&
46
               !(ret == ETIMEDOUT)) {
47
            struct timespec ts = { 0 };
48
            qemu_timeval tv;
49

    
50
            qemu_gettimeofday(&tv);
51
            ts.tv_sec = tv.tv_sec + 10;
52
            ret = pthread_cond_timedwait(&cond, &lock, &ts);
53
        }
54

    
55
        if (ret == ETIMEDOUT)
56
            break;
57

    
58
        aiocb = TAILQ_FIRST(&request_list);
59
        TAILQ_REMOVE(&request_list, aiocb, node);
60

    
61
        offset = 0;
62
        aiocb->active = 1;
63

    
64
        idle_threads--;
65
        pthread_mutex_unlock(&lock);
66

    
67
        while (offset < aiocb->aio_nbytes) {
68
            ssize_t len;
69

    
70
            if (aiocb->is_write)
71
                len = pwrite(aiocb->aio_fildes,
72
                             (const char *)aiocb->aio_buf + offset,
73
                             aiocb->aio_nbytes - offset,
74
                             aiocb->aio_offset + offset);
75
            else
76
                len = pread(aiocb->aio_fildes,
77
                            (char *)aiocb->aio_buf + offset,
78
                            aiocb->aio_nbytes - offset,
79
                            aiocb->aio_offset + offset);
80

    
81
            if (len == -1 && errno == EINTR)
82
                continue;
83
            else if (len == -1) {
84
                offset = -errno;
85
                break;
86
            } else if (len == 0)
87
                break;
88

    
89
            offset += len;
90
        }
91

    
92
        pthread_mutex_lock(&lock);
93
        aiocb->ret = offset;
94
        idle_threads++;
95
        pthread_mutex_unlock(&lock);
96

    
97
        kill(getpid(), aiocb->sigev_signo);
98
    }
99

    
100
    idle_threads--;
101
    cur_threads--;
102
    pthread_mutex_unlock(&lock);
103

    
104
    return NULL;
105
}
106

    
107
static int spawn_thread(void)
108
{
109
    pthread_attr_t attr;
110
    int ret;
111

    
112
    cur_threads++;
113
    idle_threads++;
114

    
115
    pthread_attr_init(&attr);
116
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
117
    ret = pthread_create(&thread_id, &attr, aio_thread, NULL);
118
    pthread_attr_destroy(&attr);
119

    
120
    return ret;
121
}
122

    
123
int qemu_paio_init(struct qemu_paioinit *aioinit)
124
{
125
    TAILQ_INIT(&request_list);
126

    
127
    return 0;
128
}
129

    
130
static int qemu_paio_submit(struct qemu_paiocb *aiocb, int is_write)
131
{
132
    aiocb->is_write = is_write;
133
    aiocb->ret = -EINPROGRESS;
134
    aiocb->active = 0;
135
    pthread_mutex_lock(&lock);
136
    if (idle_threads == 0 && cur_threads < max_threads)
137
        spawn_thread();
138
    TAILQ_INSERT_TAIL(&request_list, aiocb, node);
139
    pthread_mutex_unlock(&lock);
140
    pthread_cond_broadcast(&cond);
141

    
142
    return 0;
143
}
144

    
145
int qemu_paio_read(struct qemu_paiocb *aiocb)
146
{
147
    return qemu_paio_submit(aiocb, 0);
148
}
149

    
150
int qemu_paio_write(struct qemu_paiocb *aiocb)
151
{
152
    return qemu_paio_submit(aiocb, 1);
153
}
154

    
155
ssize_t qemu_paio_return(struct qemu_paiocb *aiocb)
156
{
157
    ssize_t ret;
158

    
159
    pthread_mutex_lock(&lock);
160
    ret = aiocb->ret;
161
    pthread_mutex_unlock(&lock);
162

    
163
    return ret;
164
}
165

    
166
int qemu_paio_error(struct qemu_paiocb *aiocb)
167
{
168
    ssize_t ret = qemu_paio_return(aiocb);
169

    
170
    if (ret < 0)
171
        ret = -ret;
172
    else
173
        ret = 0;
174

    
175
    return ret;
176
}
177

    
178
int qemu_paio_cancel(int fd, struct qemu_paiocb *aiocb)
179
{
180
    int ret;
181

    
182
    pthread_mutex_lock(&lock);
183
    if (!aiocb->active) {
184
        TAILQ_REMOVE(&request_list, aiocb, node);
185
        aiocb->ret = -ECANCELED;
186
        ret = QEMU_PAIO_CANCELED;
187
    } else if (aiocb->ret == -EINPROGRESS)
188
        ret = QEMU_PAIO_NOTCANCELED;
189
    else
190
        ret = QEMU_PAIO_ALLDONE;
191
    pthread_mutex_unlock(&lock);
192

    
193
    return ret;
194
}