Statistics
| Branch: | Revision:

root / buffered_file.c @ 1de7afc9

History | View | Annotate | Download (6.1 kB)

1
/*
2
 * QEMU buffered QEMUFile
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 * Contributions after 2012-01-13 are licensed under the terms of the
13
 * GNU GPL, version 2 or (at your option) any later version.
14
 */
15

    
16
#include "qemu-common.h"
17
#include "hw/hw.h"
18
#include "qemu/timer.h"
19
#include "buffered_file.h"
20

    
21
//#define DEBUG_BUFFERED_FILE
22

    
23
typedef struct QEMUFileBuffered
24
{
25
    MigrationState *migration_state;
26
    QEMUFile *file;
27
    int freeze_output;
28
    size_t bytes_xfer;
29
    size_t xfer_limit;
30
    uint8_t *buffer;
31
    size_t buffer_size;
32
    size_t buffer_capacity;
33
    QEMUTimer *timer;
34
} QEMUFileBuffered;
35

    
36
#ifdef DEBUG_BUFFERED_FILE
37
#define DPRINTF(fmt, ...) \
38
    do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
39
#else
40
#define DPRINTF(fmt, ...) \
41
    do { } while (0)
42
#endif
43

    
44
static void buffered_append(QEMUFileBuffered *s,
45
                            const uint8_t *buf, size_t size)
46
{
47
    if (size > (s->buffer_capacity - s->buffer_size)) {
48
        DPRINTF("increasing buffer capacity from %zu by %zu\n",
49
                s->buffer_capacity, size + 1024);
50

    
51
        s->buffer_capacity += size + 1024;
52

    
53
        s->buffer = g_realloc(s->buffer, s->buffer_capacity);
54
    }
55

    
56
    memcpy(s->buffer + s->buffer_size, buf, size);
57
    s->buffer_size += size;
58
}
59

    
60
static ssize_t buffered_flush(QEMUFileBuffered *s)
61
{
62
    size_t offset = 0;
63
    ssize_t ret = 0;
64

    
65
    DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
66

    
67
    while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
68

    
69
        ret = migrate_fd_put_buffer(s->migration_state, s->buffer + offset,
70
                                    s->buffer_size - offset);
71
        if (ret == -EAGAIN) {
72
            DPRINTF("backend not ready, freezing\n");
73
            ret = 0;
74
            s->freeze_output = 1;
75
            break;
76
        }
77

    
78
        if (ret <= 0) {
79
            DPRINTF("error flushing data, %zd\n", ret);
80
            break;
81
        } else {
82
            DPRINTF("flushed %zd byte(s)\n", ret);
83
            offset += ret;
84
            s->bytes_xfer += ret;
85
        }
86
    }
87

    
88
    DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
89
    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
90
    s->buffer_size -= offset;
91

    
92
    if (ret < 0) {
93
        return ret;
94
    }
95
    return offset;
96
}
97

    
98
static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
99
{
100
    QEMUFileBuffered *s = opaque;
101
    ssize_t error;
102

    
103
    DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
104

    
105
    error = qemu_file_get_error(s->file);
106
    if (error) {
107
        DPRINTF("flush when error, bailing: %s\n", strerror(-error));
108
        return error;
109
    }
110

    
111
    DPRINTF("unfreezing output\n");
112
    s->freeze_output = 0;
113

    
114
    if (size > 0) {
115
        DPRINTF("buffering %d bytes\n", size - offset);
116
        buffered_append(s, buf, size);
117
    }
118

    
119
    error = buffered_flush(s);
120
    if (error < 0) {
121
        DPRINTF("buffered flush error. bailing: %s\n", strerror(-error));
122
        return error;
123
    }
124

    
125
    if (pos == 0 && size == 0) {
126
        DPRINTF("file is ready\n");
127
        if (!s->freeze_output && s->bytes_xfer < s->xfer_limit) {
128
            DPRINTF("notifying client\n");
129
            migrate_fd_put_ready(s->migration_state);
130
        }
131
    }
132

    
133
    return size;
134
}
135

    
136
static int buffered_close(void *opaque)
137
{
138
    QEMUFileBuffered *s = opaque;
139
    ssize_t ret = 0;
140
    int ret2;
141

    
142
    DPRINTF("closing\n");
143

    
144
    s->xfer_limit = INT_MAX;
145
    while (!qemu_file_get_error(s->file) && s->buffer_size) {
146
        ret = buffered_flush(s);
147
        if (ret < 0) {
148
            break;
149
        }
150
        if (s->freeze_output) {
151
            ret = migrate_fd_wait_for_unfreeze(s->migration_state);
152
            if (ret < 0) {
153
                break;
154
            }
155
        }
156
    }
157

    
158
    ret2 = migrate_fd_close(s->migration_state);
159
    if (ret >= 0) {
160
        ret = ret2;
161
    }
162
    qemu_del_timer(s->timer);
163
    qemu_free_timer(s->timer);
164
    g_free(s->buffer);
165
    g_free(s);
166

    
167
    return ret;
168
}
169

    
170
/*
171
 * The meaning of the return values is:
172
 *   0: We can continue sending
173
 *   1: Time to stop
174
 *   negative: There has been an error
175
 */
176
static int buffered_get_fd(void *opaque)
177
{
178
    QEMUFileBuffered *s = opaque;
179

    
180
    return qemu_get_fd(s->file);
181
}
182

    
183
static int buffered_rate_limit(void *opaque)
184
{
185
    QEMUFileBuffered *s = opaque;
186
    int ret;
187

    
188
    ret = qemu_file_get_error(s->file);
189
    if (ret) {
190
        return ret;
191
    }
192
    if (s->freeze_output)
193
        return 1;
194

    
195
    if (s->bytes_xfer > s->xfer_limit)
196
        return 1;
197

    
198
    return 0;
199
}
200

    
201
static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
202
{
203
    QEMUFileBuffered *s = opaque;
204
    if (qemu_file_get_error(s->file)) {
205
        goto out;
206
    }
207
    if (new_rate > SIZE_MAX) {
208
        new_rate = SIZE_MAX;
209
    }
210

    
211
    s->xfer_limit = new_rate / 10;
212
    
213
out:
214
    return s->xfer_limit;
215
}
216

    
217
static int64_t buffered_get_rate_limit(void *opaque)
218
{
219
    QEMUFileBuffered *s = opaque;
220
  
221
    return s->xfer_limit;
222
}
223

    
224
static void buffered_rate_tick(void *opaque)
225
{
226
    QEMUFileBuffered *s = opaque;
227

    
228
    if (qemu_file_get_error(s->file)) {
229
        buffered_close(s);
230
        return;
231
    }
232

    
233
    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
234

    
235
    if (s->freeze_output)
236
        return;
237

    
238
    s->bytes_xfer = 0;
239

    
240
    buffered_put_buffer(s, NULL, 0, 0);
241
}
242

    
243
static const QEMUFileOps buffered_file_ops = {
244
    .get_fd =         buffered_get_fd,
245
    .put_buffer =     buffered_put_buffer,
246
    .close =          buffered_close,
247
    .rate_limit =     buffered_rate_limit,
248
    .get_rate_limit = buffered_get_rate_limit,
249
    .set_rate_limit = buffered_set_rate_limit,
250
};
251

    
252
QEMUFile *qemu_fopen_ops_buffered(MigrationState *migration_state)
253
{
254
    QEMUFileBuffered *s;
255

    
256
    s = g_malloc0(sizeof(*s));
257

    
258
    s->migration_state = migration_state;
259
    s->xfer_limit = migration_state->bandwidth_limit / 10;
260

    
261
    s->file = qemu_fopen_ops(s, &buffered_file_ops);
262

    
263
    s->timer = qemu_new_timer_ms(rt_clock, buffered_rate_tick, s);
264

    
265
    qemu_mod_timer(s->timer, qemu_get_clock_ms(rt_clock) + 100);
266

    
267
    return s->file;
268
}