Statistics
| Branch: | Revision:

root / buffered_file.c @ e7627482

History | View | Annotate | Download (6.2 kB)

1
/*
2
 * QEMU buffered QEMUFile
3
 *
4
 * Copyright IBM, Corp. 2008
5
 *
6
 * Authors:
7
 *  Anthony Liguori   <aliguori@us.ibm.com>
8
 *
9
 * This work is licensed under the terms of the GNU GPL, version 2.  See
10
 * the COPYING file in the top-level directory.
11
 *
12
 * Contributions after 2012-01-13 are licensed under the terms of the
13
 * GNU GPL, version 2 or (at your option) any later version.
14
 */
15

    
16
#include "qemu-common.h"
17
#include "hw/hw.h"
18
#include "qemu/timer.h"
19
#include "buffered_file.h"
20
#include "qemu/thread.h"
21

    
22
//#define DEBUG_BUFFERED_FILE
23

    
24
typedef struct QEMUFileBuffered
25
{
26
    MigrationState *migration_state;
27
    QEMUFile *file;
28
    size_t bytes_xfer;
29
    size_t xfer_limit;
30
    uint8_t *buffer;
31
    size_t buffer_size;
32
    size_t buffer_capacity;
33
    QemuThread thread;
34
} QEMUFileBuffered;
35

    
36
#ifdef DEBUG_BUFFERED_FILE
37
#define DPRINTF(fmt, ...) \
38
    do { printf("buffered-file: " fmt, ## __VA_ARGS__); } while (0)
39
#else
40
#define DPRINTF(fmt, ...) \
41
    do { } while (0)
42
#endif
43

    
44
static void buffered_append(QEMUFileBuffered *s,
45
                            const uint8_t *buf, size_t size)
46
{
47
    if (size > (s->buffer_capacity - s->buffer_size)) {
48
        DPRINTF("increasing buffer capacity from %zu by %zu\n",
49
                s->buffer_capacity, size + 1024);
50

    
51
        s->buffer_capacity += size + 1024;
52

    
53
        s->buffer = g_realloc(s->buffer, s->buffer_capacity);
54
    }
55

    
56
    memcpy(s->buffer + s->buffer_size, buf, size);
57
    s->buffer_size += size;
58
}
59

    
60
static ssize_t buffered_flush(QEMUFileBuffered *s)
61
{
62
    size_t offset = 0;
63
    ssize_t ret = 0;
64

    
65
    DPRINTF("flushing %zu byte(s) of data\n", s->buffer_size);
66

    
67
    while (s->bytes_xfer < s->xfer_limit && offset < s->buffer_size) {
68
        size_t to_send = MIN(s->buffer_size - offset, s->xfer_limit - s->bytes_xfer);
69
        ret = migrate_fd_put_buffer(s->migration_state, s->buffer + offset,
70
                                    to_send);
71
        if (ret <= 0) {
72
            DPRINTF("error flushing data, %zd\n", ret);
73
            break;
74
        } else {
75
            DPRINTF("flushed %zd byte(s)\n", ret);
76
            offset += ret;
77
            s->bytes_xfer += ret;
78
        }
79
    }
80

    
81
    DPRINTF("flushed %zu of %zu byte(s)\n", offset, s->buffer_size);
82
    memmove(s->buffer, s->buffer + offset, s->buffer_size - offset);
83
    s->buffer_size -= offset;
84

    
85
    if (ret < 0) {
86
        return ret;
87
    }
88
    return offset;
89
}
90

    
91
static int buffered_put_buffer(void *opaque, const uint8_t *buf, int64_t pos, int size)
92
{
93
    QEMUFileBuffered *s = opaque;
94
    ssize_t error;
95

    
96
    DPRINTF("putting %d bytes at %" PRId64 "\n", size, pos);
97

    
98
    error = qemu_file_get_error(s->file);
99
    if (error) {
100
        DPRINTF("flush when error, bailing: %s\n", strerror(-error));
101
        return error;
102
    }
103

    
104
    if (size > 0) {
105
        DPRINTF("buffering %d bytes\n", size - offset);
106
        buffered_append(s, buf, size);
107
    }
108

    
109
    error = buffered_flush(s);
110
    if (error < 0) {
111
        DPRINTF("buffered flush error. bailing: %s\n", strerror(-error));
112
        return error;
113
    }
114

    
115
    if (pos == 0 && size == 0) {
116
        DPRINTF("file is ready\n");
117
        if (s->bytes_xfer < s->xfer_limit) {
118
            DPRINTF("notifying client\n");
119
            migrate_fd_put_ready(s->migration_state);
120
        }
121
    }
122

    
123
    return size;
124
}
125

    
126
static int buffered_close(void *opaque)
127
{
128
    QEMUFileBuffered *s = opaque;
129
    ssize_t ret = 0;
130
    int ret2;
131

    
132
    DPRINTF("closing\n");
133

    
134
    s->xfer_limit = INT_MAX;
135
    while (!qemu_file_get_error(s->file) && s->buffer_size) {
136
        ret = buffered_flush(s);
137
        if (ret < 0) {
138
            break;
139
        }
140
    }
141

    
142
    ret2 = migrate_fd_close(s->migration_state);
143
    if (ret >= 0) {
144
        ret = ret2;
145
    }
146
    ret = migrate_fd_close(s->migration_state);
147
    s->migration_state->complete = true;
148
    return ret;
149
}
150

    
151
/*
152
 * The meaning of the return values is:
153
 *   0: We can continue sending
154
 *   1: Time to stop
155
 *   negative: There has been an error
156
 */
157
static int buffered_get_fd(void *opaque)
158
{
159
    QEMUFileBuffered *s = opaque;
160

    
161
    return qemu_get_fd(s->file);
162
}
163

    
164
static int buffered_rate_limit(void *opaque)
165
{
166
    QEMUFileBuffered *s = opaque;
167
    int ret;
168

    
169
    ret = qemu_file_get_error(s->file);
170
    if (ret) {
171
        return ret;
172
    }
173

    
174
    if (s->bytes_xfer > s->xfer_limit)
175
        return 1;
176

    
177
    return 0;
178
}
179

    
180
static int64_t buffered_set_rate_limit(void *opaque, int64_t new_rate)
181
{
182
    QEMUFileBuffered *s = opaque;
183
    if (qemu_file_get_error(s->file)) {
184
        goto out;
185
    }
186
    if (new_rate > SIZE_MAX) {
187
        new_rate = SIZE_MAX;
188
    }
189

    
190
    s->xfer_limit = new_rate / 10;
191
    
192
out:
193
    return s->xfer_limit;
194
}
195

    
196
static int64_t buffered_get_rate_limit(void *opaque)
197
{
198
    QEMUFileBuffered *s = opaque;
199
  
200
    return s->xfer_limit;
201
}
202

    
203
/* 10ms  xfer_limit is the limit that we should write each 10ms */
204
#define BUFFER_DELAY 100
205

    
206
static void *buffered_file_thread(void *opaque)
207
{
208
    QEMUFileBuffered *s = opaque;
209
    int64_t expire_time = qemu_get_clock_ms(rt_clock) + BUFFER_DELAY;
210

    
211
    while (true) {
212
        int64_t current_time = qemu_get_clock_ms(rt_clock);
213

    
214
        if (s->migration_state->complete) {
215
            break;
216
        }
217
        if (current_time >= expire_time) {
218
            s->bytes_xfer = 0;
219
            expire_time = current_time + BUFFER_DELAY;
220
        }
221
        if (s->bytes_xfer >= s->xfer_limit) {
222
            /* usleep expects microseconds */
223
            g_usleep((expire_time - current_time)*1000);
224
        }
225
        buffered_put_buffer(s, NULL, 0, 0);
226
    }
227
    g_free(s->buffer);
228
    g_free(s);
229
    return NULL;
230
}
231

    
232
static const QEMUFileOps buffered_file_ops = {
233
    .get_fd =         buffered_get_fd,
234
    .put_buffer =     buffered_put_buffer,
235
    .close =          buffered_close,
236
    .rate_limit =     buffered_rate_limit,
237
    .get_rate_limit = buffered_get_rate_limit,
238
    .set_rate_limit = buffered_set_rate_limit,
239
};
240

    
241
void qemu_fopen_ops_buffered(MigrationState *migration_state)
242
{
243
    QEMUFileBuffered *s;
244

    
245
    s = g_malloc0(sizeof(*s));
246

    
247
    s->migration_state = migration_state;
248
    s->xfer_limit = migration_state->bandwidth_limit / 10;
249
    s->migration_state->complete = false;
250

    
251
    s->file = qemu_fopen_ops(s, &buffered_file_ops);
252

    
253
    migration_state->file = s->file;
254

    
255
    qemu_thread_create(&s->thread, buffered_file_thread, s,
256
                       QEMU_THREAD_DETACHED);
257
}