Statistics
| Branch: | Revision:

root / buffered_file.c @ 7bd427d8

History | View | Annotate | Download (6.4 kB)

1
/*
2
 * QEMU buffered QEMUFile
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include "qemu-common.h"
15
#include "hw/hw.h"
16
#include "qemu-timer.h"
17
#include "sysemu.h"
18
#include "qemu-char.h"
19
#include "buffered_file.h"
20

    
21
//#define DEBUG_BUFFERED_FILE
22

    
23
typedef struct QEMUFileBuffered
24
{
25
    BufferedPutFunc *put_buffer;
26
    BufferedPutReadyFunc *put_ready;
27
    BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
28
    BufferedCloseFunc *close;
29
    void *opaque;
30
    QEMUFile *file;
31
    int has_error;
32
    int freeze_output;
33
    size_t bytes_xfer;
34
    size_t xfer_limit;
35
    uint8_t *buffer;
36
    size_t buffer_size;
37
    size_t buffer_capacity;
38
    QEMUTimer *timer;
39
} QEMUFileBuffered;
40

    
41
#ifdef DEBUG_BUFFERED_FILE
42
#define DPRINTF(fmt, ...) \
43
    do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
44
#else
45
#define DPRINTF(fmt, ...) \
46
    do { } while (0)
47
#endif
48

    
49
static void buffered_append(QEMUFileBuffered *s,
50
                            const uint8_t *buf, size_t size)
51
{
52
    if (size > (s->buffer_capacity - s->buffer_size)) {
53
        void *tmp;
54

    
55
        DPRINTF("increasing buffer capacity from %zu by %zu\n",
56
                s->buffer_capacity, size + 1024);
57

    
58
        s->buffer_capacity += size + 1024;
59

    
60
        tmp = qemu_realloc(s->buffer, s->buffer_capacity);
61
        if (tmp == NULL) {
62
            fprintf(stderr, "qemu file buffer expansion failed\n");
63
            exit(1);
64
        }
65

    
66
        s->buffer = tmp;
67
    }
68

    
69
    memcpy(s->buffer + s->buffer_size, buf, size);
70
    s->buffer_size += size;
71
}
72

    
73
static void buffered_flush(QEMUFileBuffered *s)
74
{
75
    size_t offset = 0;
76

    
77
    if (s->has_error) {
78
        DPRINTF("flush when error, bailing\n");
79
        return;
80
    }
81

    
82
    DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
83

    
84
    while (offset < s->buffer_size) {
85
        ssize_t ret;
86

    
87
        ret = s->put_buffer(s->opaque, s->buffer + offset,
88
                            s->buffer_size - offset);
89
        if (ret == -EAGAIN) {
90
            DPRINTF("backend not ready, freezing\n");
91
            s->freeze_output = 1;
92
            break;
93
        }
94

    
95
        if (ret <= 0) {
96
            DPRINTF("error flushing data, %zd\n", ret);
97
            s->has_error = 1;
98
            break;
99
        } else {
100
            DPRINTF("flushed %zd byte(s)\n", ret);
101
            offset += ret;
102
        }
103
    }
104

    
105
    DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
106
    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
107
    s->buffer_size -= offset;
108
}
109

    
110
static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
111
{
112
    QEMUFileBuffered *s = opaque;
113
    int offset = 0;
114
    ssize_t ret;
115

    
116
    DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
117

    
118
    if (s->has_error) {
119
        DPRINTF("flush when error, bailing\n");
120
        return -EINVAL;
121
    }
122

    
123
    DPRINTF("unfreezing output\n");
124
    s->freeze_output = 0;
125

    
126
    buffered_flush(s);
127

    
128
    while (!s->freeze_output && offset < size) {
129
        if (s->bytes_xfer > s->xfer_limit) {
130
            DPRINTF("transfer limit exceeded when putting\n");
131
            break;
132
        }
133

    
134
        ret = s->put_buffer(s->opaque, buf + offset, size - offset);
135
        if (ret == -EAGAIN) {
136
            DPRINTF("backend not ready, freezing\n");
137
            s->freeze_output = 1;
138
            break;
139
        }
140

    
141
        if (ret <= 0) {
142
            DPRINTF("error putting\n");
143
            s->has_error = 1;
144
            offset = -EINVAL;
145
            break;
146
        }
147

    
148
        DPRINTF("put %zd byte(s)\n", ret);
149
        offset += ret;
150
        s->bytes_xfer += ret;
151
    }
152

    
153
    if (offset >= 0) {
154
        DPRINTF("buffering %d bytes\n", size - offset);
155
        buffered_append(s, buf + offset, size - offset);
156
        offset = size;
157
    }
158

    
159
    if (pos == 0 && size == 0) {
160
        DPRINTF("file is ready\n");
161
        if (s->bytes_xfer <= s->xfer_limit) {
162
            DPRINTF("notifying client\n");
163
            s->put_ready(s->opaque);
164
        }
165
    }
166

    
167
    return offset;
168
}
169

    
170
static int buffered_close(void *opaque)
171
{
172
    QEMUFileBuffered *s = opaque;
173
    int ret;
174

    
175
    DPRINTF("closing\n");
176

    
177
    while (!s->has_error && s->buffer_size) {
178
        buffered_flush(s);
179
        if (s->freeze_output)
180
            s->wait_for_unfreeze(s);
181
    }
182

    
183
    ret = s->close(s->opaque);
184

    
185
    qemu_del_timer(s->timer);
186
    qemu_free_timer(s->timer);
187
    qemu_free(s->buffer);
188
    qemu_free(s);
189

    
190
    return ret;
191
}
192

    
193
static int buffered_rate_limit(void *opaque)
194
{
195
    QEMUFileBuffered *s = opaque;
196

    
197
    if (s->has_error)
198
        return 0;
199

    
200
    if (s->freeze_output)
201
        return 1;
202

    
203
    if (s->bytes_xfer > s->xfer_limit)
204
        return 1;
205

    
206
    return 0;
207
}
208

    
209
static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
210
{
211
    QEMUFileBuffered *s = opaque;
212
    if (s->has_error)
213
        goto out;
214

    
215
    if (new_rate > SIZE_MAX) {
216
        new_rate = SIZE_MAX;
217
    }
218

    
219
    s->xfer_limit = new_rate / 10;
220
    
221
out:
222
    return s->xfer_limit;
223
}
224

    
225
static int64_t buffered_get_rate_limit(void *opaque)
226
{
227
    QEMUFileBuffered *s = opaque;
228
  
229
    return s->xfer_limit;
230
}
231

    
232
static void buffered_rate_tick(void *opaque)
233
{
234
    QEMUFileBuffered *s = opaque;
235

    
236
    if (s->has_error) {
237
        buffered_close(s);
238
        return;
239
    }
240

    
241
    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
242

    
243
    if (s->freeze_output)
244
        return;
245

    
246
    s->bytes_xfer = 0;
247

    
248
    buffered_flush(s);
249

    
250
    /* Add some checks around this */
251
    s->put_ready(s->opaque);
252
}
253

    
254
QEMUFile *qemu_fopen_ops_buffered(void *opaque,
255
                                  size_t bytes_per_sec,
256
                                  BufferedPutFunc *put_buffer,
257
                                  BufferedPutReadyFunc *put_ready,
258
                                  BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
259
                                  BufferedCloseFunc *close)
260
{
261
    QEMUFileBuffered *s;
262

    
263
    s = qemu_mallocz(sizeof(*s));
264

    
265
    s->opaque = opaque;
266
    s->xfer_limit = bytes_per_sec / 10;
267
    s->put_buffer = put_buffer;
268
    s->put_ready = put_ready;
269
    s->wait_for_unfreeze = wait_for_unfreeze;
270
    s->close = close;
271

    
272
    s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
273
                             buffered_close, buffered_rate_limit,
274
                             buffered_set_rate_limit,
275
                             buffered_get_rate_limit);
276

    
277
    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
278

    
279
    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
280

    
281
    return s->file;
282
}