Statistics
| Branch: | Revision:

root / buffered_file.c @ c09015dd

History | View | Annotate | Download (6.8 kB)

1
/*
2
 * QEMU buffered QEMUFile
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 */
13

    
14
#include "qemu-common.h"
15
#include "hw/hw.h"
16
#include "qemu-timer.h"
17
#include "qemu-char.h"
18
#include "buffered_file.h"
19

    
20
//#define DEBUG_BUFFERED_FILE
21

    
22
typedef struct QEMUFileBuffered
23
{
24
    BufferedPutFunc *put_buffer;
25
    BufferedPutReadyFunc *put_ready;
26
    BufferedWaitForUnfreezeFunc *wait_for_unfreeze;
27
    BufferedCloseFunc *close;
28
    void *opaque;
29
    QEMUFile *file;
30
    int freeze_output;
31
    size_t bytes_xfer;
32
    size_t xfer_limit;
33
    uint8_t *buffer;
34
    size_t buffer_size;
35
    size_t buffer_capacity;
36
    QEMUTimer *timer;
37
} QEMUFileBuffered;
38

    
39
#ifdef DEBUG_BUFFERED_FILE
40
#define DPRINTF(fmt, ...) \
41
    do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
42
#else
43
#define DPRINTF(fmt, ...) \
44
    do { } while (0)
45
#endif
46

    
47
static void buffered_append(QEMUFileBuffered *s,
48
                            const uint8_t *buf, size_t size)
49
{
50
    if (size > (s->buffer_capacity - s->buffer_size)) {
51
        void *tmp;
52

    
53
        DPRINTF("increasing buffer capacity from %zu by %zu\n",
54
                s->buffer_capacity, size + 1024);
55

    
56
        s->buffer_capacity += size + 1024;
57

    
58
        tmp = g_realloc(s->buffer, s->buffer_capacity);
59
        if (tmp == NULL) {
60
            fprintf(stderr, "qemu file buffer expansion failed\n");
61
            exit(1);
62
        }
63

    
64
        s->buffer = tmp;
65
    }
66

    
67
    memcpy(s->buffer + s->buffer_size, buf, size);
68
    s->buffer_size += size;
69
}
70

    
71
static void buffered_flush(QEMUFileBuffered *s)
72
{
73
    size_t offset = 0;
74
    int error;
75

    
76
    error = qemu_file_get_error(s->file);
77
    if (error != 0) {
78
        DPRINTF("flush when error, bailing: %s\n", strerror(-error));
79
        return;
80
    }
81

    
82
    DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
83

    
84
    while (offset < s->buffer_size) {
85
        ssize_t ret;
86

    
87
        ret = s->put_buffer(s->opaque, s->buffer + offset,
88
                            s->buffer_size - offset);
89
        if (ret == -EAGAIN) {
90
            DPRINTF("backend not ready, freezing\n");
91
            s->freeze_output = 1;
92
            break;
93
        }
94

    
95
        if (ret <= 0) {
96
            DPRINTF("error flushing data, %zd\n", ret);
97
            qemu_file_set_error(s->file, ret);
98
            break;
99
        } else {
100
            DPRINTF("flushed %zd byte(s)\n", ret);
101
            offset += ret;
102
        }
103
    }
104

    
105
    DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
106
    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
107
    s->buffer_size -= offset;
108
}
109

    
110
static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
111
{
112
    QEMUFileBuffered *s = opaque;
113
    int offset = 0, error;
114
    ssize_t ret;
115

    
116
    DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
117

    
118
    error = qemu_file_get_error(s->file);
119
    if (error) {
120
        DPRINTF("flush when error, bailing: %s\n", strerror(-error));
121
        return error;
122
    }
123

    
124
    DPRINTF("unfreezing output\n");
125
    s->freeze_output = 0;
126

    
127
    buffered_flush(s);
128

    
129
    while (!s->freeze_output && offset < size) {
130
        if (s->bytes_xfer > s->xfer_limit) {
131
            DPRINTF("transfer limit exceeded when putting\n");
132
            break;
133
        }
134

    
135
        ret = s->put_buffer(s->opaque, buf + offset, size - offset);
136
        if (ret == -EAGAIN) {
137
            DPRINTF("backend not ready, freezing\n");
138
            s->freeze_output = 1;
139
            break;
140
        }
141

    
142
        if (ret <= 0) {
143
            DPRINTF("error putting\n");
144
            qemu_file_set_error(s->file, ret);
145
            offset = -EINVAL;
146
            break;
147
        }
148

    
149
        DPRINTF("put %zd byte(s)\n", ret);
150
        offset += ret;
151
        s->bytes_xfer += ret;
152
    }
153

    
154
    if (offset >= 0) {
155
        DPRINTF("buffering %d bytes\n", size - offset);
156
        buffered_append(s, buf + offset, size - offset);
157
        offset = size;
158
    }
159

    
160
    if (pos == 0 && size == 0) {
161
        DPRINTF("file is ready\n");
162
        if (s->bytes_xfer <= s->xfer_limit) {
163
            DPRINTF("notifying client\n");
164
            s->put_ready(s->opaque);
165
        }
166
    }
167

    
168
    return offset;
169
}
170

    
171
static int buffered_close(void *opaque)
172
{
173
    QEMUFileBuffered *s = opaque;
174
    int ret;
175

    
176
    DPRINTF("closing\n");
177

    
178
    while (!qemu_file_get_error(s->file) && s->buffer_size) {
179
        buffered_flush(s);
180
        if (s->freeze_output)
181
            s->wait_for_unfreeze(s->opaque);
182
    }
183

    
184
    ret = s->close(s->opaque);
185

    
186
    qemu_del_timer(s->timer);
187
    qemu_free_timer(s->timer);
188
    g_free(s->buffer);
189
    g_free(s);
190

    
191
    return ret;
192
}
193

    
194
/*
195
 * The meaning of the return values is:
196
 *   0: We can continue sending
197
 *   1: Time to stop
198
 *   negative: There has been an error
199
 */
200
static int buffered_rate_limit(void *opaque)
201
{
202
    QEMUFileBuffered *s = opaque;
203
    int ret;
204

    
205
    ret = qemu_file_get_error(s->file);
206
    if (ret) {
207
        return ret;
208
    }
209
    if (s->freeze_output)
210
        return 1;
211

    
212
    if (s->bytes_xfer > s->xfer_limit)
213
        return 1;
214

    
215
    return 0;
216
}
217

    
218
static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
219
{
220
    QEMUFileBuffered *s = opaque;
221
    if (qemu_file_get_error(s->file)) {
222
        goto out;
223
    }
224
    if (new_rate > SIZE_MAX) {
225
        new_rate = SIZE_MAX;
226
    }
227

    
228
    s->xfer_limit = new_rate / 10;
229
    
230
out:
231
    return s->xfer_limit;
232
}
233

    
234
static int64_t buffered_get_rate_limit(void *opaque)
235
{
236
    QEMUFileBuffered *s = opaque;
237
  
238
    return s->xfer_limit;
239
}
240

    
241
static void buffered_rate_tick(void *opaque)
242
{
243
    QEMUFileBuffered *s = opaque;
244

    
245
    if (qemu_file_get_error(s->file)) {
246
        buffered_close(s);
247
        return;
248
    }
249

    
250
    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
251

    
252
    if (s->freeze_output)
253
        return;
254

    
255
    s->bytes_xfer = 0;
256

    
257
    buffered_flush(s);
258

    
259
    /* Add some checks around this */
260
    s->put_ready(s->opaque);
261
}
262

    
263
QEMUFile *qemu_fopen_ops_buffered(void *opaque,
264
                                  size_t bytes_per_sec,
265
                                  BufferedPutFunc *put_buffer,
266
                                  BufferedPutReadyFunc *put_ready,
267
                                  BufferedWaitForUnfreezeFunc *wait_for_unfreeze,
268
                                  BufferedCloseFunc *close)
269
{
270
    QEMUFileBuffered *s;
271

    
272
    s = g_malloc0(sizeof(*s));
273

    
274
    s->opaque = opaque;
275
    s->xfer_limit = bytes_per_sec / 10;
276
    s->put_buffer = put_buffer;
277
    s->put_ready = put_ready;
278
    s->wait_for_unfreeze = wait_for_unfreeze;
279
    s->close = close;
280

    
281
    s->file = qemu_fopen_ops(s, buffered_put_buffer, NULL,
282
                             buffered_close, buffered_rate_limit,
283
                             buffered_set_rate_limit,
284
                             buffered_get_rate_limit);
285

    
286
    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
287

    
288
    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
289

    
290
    return s->file;
291
}