Statistics
| Branch: | Revision:

root / block / nbd-client.c @ d0277315

History | View | Annotate | Download (11 kB)

1
/*
2
 * QEMU Block driver for  NBD
3
 *
4
 * Copyright (C) 2008 Bull S.A.S.
5
 *     Author: Laurent Vivier <Laurent.Vivier@bull.net>
6
 *
7
 * Some parts:
8
 *    Copyright (C) 2007 Anthony Liguori <anthony@codemonkey.ws>
9
 *
10
 * Permission is hereby granted, free of charge, to any person obtaining a copy
11
 * of this software and associated documentation files (the "Software"), to deal
12
 * in the Software without restriction, including without limitation the rights
13
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
14
 * copies of the Software, and to permit persons to whom the Software is
15
 * furnished to do so, subject to the following conditions:
16
 *
17
 * The above copyright notice and this permission notice shall be included in
18
 * all copies or substantial portions of the Software.
19
 *
20
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
21
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
22
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
23
 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
24
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
25
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
26
 * THE SOFTWARE.
27
 */
28

    
29
#include "nbd-client.h"
30
#include "qemu/sockets.h"
31

    
32
#define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
33
#define INDEX_TO_HANDLE(bs, index)  ((index)  ^ ((uint64_t)(intptr_t)bs))
34

    
35
static void nbd_recv_coroutines_enter_all(NbdClientSession *s)
36
{
37
    int i;
38

    
39
    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
40
        if (s->recv_coroutine[i]) {
41
            qemu_coroutine_enter(s->recv_coroutine[i], NULL);
42
        }
43
    }
44
}
45

    
46
static void nbd_reply_ready(void *opaque)
47
{
48
    NbdClientSession *s = opaque;
49
    uint64_t i;
50
    int ret;
51

    
52
    if (s->reply.handle == 0) {
53
        /* No reply already in flight.  Fetch a header.  It is possible
54
         * that another thread has done the same thing in parallel, so
55
         * the socket is not readable anymore.
56
         */
57
        ret = nbd_receive_reply(s->sock, &s->reply);
58
        if (ret == -EAGAIN) {
59
            return;
60
        }
61
        if (ret < 0) {
62
            s->reply.handle = 0;
63
            goto fail;
64
        }
65
    }
66

    
67
    /* There's no need for a mutex on the receive side, because the
68
     * handler acts as a synchronization point and ensures that only
69
     * one coroutine is called until the reply finishes.  */
70
    i = HANDLE_TO_INDEX(s, s->reply.handle);
71
    if (i >= MAX_NBD_REQUESTS) {
72
        goto fail;
73
    }
74

    
75
    if (s->recv_coroutine[i]) {
76
        qemu_coroutine_enter(s->recv_coroutine[i], NULL);
77
        return;
78
    }
79

    
80
fail:
81
    nbd_recv_coroutines_enter_all(s);
82
}
83

    
84
static void nbd_restart_write(void *opaque)
85
{
86
    NbdClientSession *s = opaque;
87

    
88
    qemu_coroutine_enter(s->send_coroutine, NULL);
89
}
90

    
91
static int nbd_co_send_request(NbdClientSession *s,
92
    struct nbd_request *request,
93
    QEMUIOVector *qiov, int offset)
94
{
95
    int rc, ret;
96

    
97
    qemu_co_mutex_lock(&s->send_mutex);
98
    s->send_coroutine = qemu_coroutine_self();
99
    qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write, s);
100
    if (qiov) {
101
        if (!s->is_unix) {
102
            socket_set_cork(s->sock, 1);
103
        }
104
        rc = nbd_send_request(s->sock, request);
105
        if (rc >= 0) {
106
            ret = qemu_co_sendv(s->sock, qiov->iov, qiov->niov,
107
                                offset, request->len);
108
            if (ret != request->len) {
109
                rc = -EIO;
110
            }
111
        }
112
        if (!s->is_unix) {
113
            socket_set_cork(s->sock, 0);
114
        }
115
    } else {
116
        rc = nbd_send_request(s->sock, request);
117
    }
118
    qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL, s);
119
    s->send_coroutine = NULL;
120
    qemu_co_mutex_unlock(&s->send_mutex);
121
    return rc;
122
}
123

    
124
static void nbd_co_receive_reply(NbdClientSession *s,
125
    struct nbd_request *request, struct nbd_reply *reply,
126
    QEMUIOVector *qiov, int offset)
127
{
128
    int ret;
129

    
130
    /* Wait until we're woken up by the read handler.  TODO: perhaps
131
     * peek at the next reply and avoid yielding if it's ours?  */
132
    qemu_coroutine_yield();
133
    *reply = s->reply;
134
    if (reply->handle != request->handle) {
135
        reply->error = EIO;
136
    } else {
137
        if (qiov && reply->error == 0) {
138
            ret = qemu_co_recvv(s->sock, qiov->iov, qiov->niov,
139
                                offset, request->len);
140
            if (ret != request->len) {
141
                reply->error = EIO;
142
            }
143
        }
144

    
145
        /* Tell the read handler to read another header.  */
146
        s->reply.handle = 0;
147
    }
148
}
149

    
150
static void nbd_coroutine_start(NbdClientSession *s,
151
   struct nbd_request *request)
152
{
153
    int i;
154

    
155
    /* Poor man semaphore.  The free_sema is locked when no other request
156
     * can be accepted, and unlocked after receiving one reply.  */
157
    if (s->in_flight >= MAX_NBD_REQUESTS - 1) {
158
        qemu_co_mutex_lock(&s->free_sema);
159
        assert(s->in_flight < MAX_NBD_REQUESTS);
160
    }
161
    s->in_flight++;
162

    
163
    for (i = 0; i < MAX_NBD_REQUESTS; i++) {
164
        if (s->recv_coroutine[i] == NULL) {
165
            s->recv_coroutine[i] = qemu_coroutine_self();
166
            break;
167
        }
168
    }
169

    
170
    assert(i < MAX_NBD_REQUESTS);
171
    request->handle = INDEX_TO_HANDLE(s, i);
172
}
173

    
174
static void nbd_coroutine_end(NbdClientSession *s,
175
    struct nbd_request *request)
176
{
177
    int i = HANDLE_TO_INDEX(s, request->handle);
178
    s->recv_coroutine[i] = NULL;
179
    if (s->in_flight-- == MAX_NBD_REQUESTS) {
180
        qemu_co_mutex_unlock(&s->free_sema);
181
    }
182
}
183

    
184
static int nbd_co_readv_1(NbdClientSession *client, int64_t sector_num,
185
                          int nb_sectors, QEMUIOVector *qiov,
186
                          int offset)
187
{
188
    struct nbd_request request = { .type = NBD_CMD_READ };
189
    struct nbd_reply reply;
190
    ssize_t ret;
191

    
192
    request.from = sector_num * 512;
193
    request.len = nb_sectors * 512;
194

    
195
    nbd_coroutine_start(client, &request);
196
    ret = nbd_co_send_request(client, &request, NULL, 0);
197
    if (ret < 0) {
198
        reply.error = -ret;
199
    } else {
200
        nbd_co_receive_reply(client, &request, &reply, qiov, offset);
201
    }
202
    nbd_coroutine_end(client, &request);
203
    return -reply.error;
204

    
205
}
206

    
207
static int nbd_co_writev_1(NbdClientSession *client, int64_t sector_num,
208
                           int nb_sectors, QEMUIOVector *qiov,
209
                           int offset)
210
{
211
    struct nbd_request request = { .type = NBD_CMD_WRITE };
212
    struct nbd_reply reply;
213
    ssize_t ret;
214

    
215
    if (!bdrv_enable_write_cache(client->bs) &&
216
        (client->nbdflags & NBD_FLAG_SEND_FUA)) {
217
        request.type |= NBD_CMD_FLAG_FUA;
218
    }
219

    
220
    request.from = sector_num * 512;
221
    request.len = nb_sectors * 512;
222

    
223
    nbd_coroutine_start(client, &request);
224
    ret = nbd_co_send_request(client, &request, qiov, offset);
225
    if (ret < 0) {
226
        reply.error = -ret;
227
    } else {
228
        nbd_co_receive_reply(client, &request, &reply, NULL, 0);
229
    }
230
    nbd_coroutine_end(client, &request);
231
    return -reply.error;
232
}
233

    
234
/* qemu-nbd has a limit of slightly less than 1M per request.  Try to
235
 * remain aligned to 4K. */
236
#define NBD_MAX_SECTORS 2040
237

    
238
int nbd_client_session_co_readv(NbdClientSession *client, int64_t sector_num,
239
    int nb_sectors, QEMUIOVector *qiov)
240
{
241
    int offset = 0;
242
    int ret;
243
    while (nb_sectors > NBD_MAX_SECTORS) {
244
        ret = nbd_co_readv_1(client, sector_num,
245
                             NBD_MAX_SECTORS, qiov, offset);
246
        if (ret < 0) {
247
            return ret;
248
        }
249
        offset += NBD_MAX_SECTORS * 512;
250
        sector_num += NBD_MAX_SECTORS;
251
        nb_sectors -= NBD_MAX_SECTORS;
252
    }
253
    return nbd_co_readv_1(client, sector_num, nb_sectors, qiov, offset);
254
}
255

    
256
int nbd_client_session_co_writev(NbdClientSession *client, int64_t sector_num,
257
                                 int nb_sectors, QEMUIOVector *qiov)
258
{
259
    int offset = 0;
260
    int ret;
261
    while (nb_sectors > NBD_MAX_SECTORS) {
262
        ret = nbd_co_writev_1(client, sector_num,
263
                              NBD_MAX_SECTORS, qiov, offset);
264
        if (ret < 0) {
265
            return ret;
266
        }
267
        offset += NBD_MAX_SECTORS * 512;
268
        sector_num += NBD_MAX_SECTORS;
269
        nb_sectors -= NBD_MAX_SECTORS;
270
    }
271
    return nbd_co_writev_1(client, sector_num, nb_sectors, qiov, offset);
272
}
273

    
274
int nbd_client_session_co_flush(NbdClientSession *client)
275
{
276
    struct nbd_request request = { .type = NBD_CMD_FLUSH };
277
    struct nbd_reply reply;
278
    ssize_t ret;
279

    
280
    if (!(client->nbdflags & NBD_FLAG_SEND_FLUSH)) {
281
        return 0;
282
    }
283

    
284
    if (client->nbdflags & NBD_FLAG_SEND_FUA) {
285
        request.type |= NBD_CMD_FLAG_FUA;
286
    }
287

    
288
    request.from = 0;
289
    request.len = 0;
290

    
291
    nbd_coroutine_start(client, &request);
292
    ret = nbd_co_send_request(client, &request, NULL, 0);
293
    if (ret < 0) {
294
        reply.error = -ret;
295
    } else {
296
        nbd_co_receive_reply(client, &request, &reply, NULL, 0);
297
    }
298
    nbd_coroutine_end(client, &request);
299
    return -reply.error;
300
}
301

    
302
int nbd_client_session_co_discard(NbdClientSession *client, int64_t sector_num,
303
    int nb_sectors)
304
{
305
    struct nbd_request request = { .type = NBD_CMD_TRIM };
306
    struct nbd_reply reply;
307
    ssize_t ret;
308

    
309
    if (!(client->nbdflags & NBD_FLAG_SEND_TRIM)) {
310
        return 0;
311
    }
312
    request.from = sector_num * 512;
313
    request.len = nb_sectors * 512;
314

    
315
    nbd_coroutine_start(client, &request);
316
    ret = nbd_co_send_request(client, &request, NULL, 0);
317
    if (ret < 0) {
318
        reply.error = -ret;
319
    } else {
320
        nbd_co_receive_reply(client, &request, &reply, NULL, 0);
321
    }
322
    nbd_coroutine_end(client, &request);
323
    return -reply.error;
324

    
325
}
326

    
327
static void nbd_teardown_connection(NbdClientSession *client)
328
{
329
    struct nbd_request request = {
330
        .type = NBD_CMD_DISC,
331
        .from = 0,
332
        .len = 0
333
    };
334

    
335
    nbd_send_request(client->sock, &request);
336

    
337
    /* finish any pending coroutines */
338
    shutdown(client->sock, 2);
339
    nbd_recv_coroutines_enter_all(client);
340

    
341
    qemu_aio_set_fd_handler(client->sock, NULL, NULL, NULL);
342
    closesocket(client->sock);
343
    client->sock = -1;
344
}
345

    
346
void nbd_client_session_close(NbdClientSession *client)
347
{
348
    if (!client->bs) {
349
        return;
350
    }
351

    
352
    nbd_teardown_connection(client);
353
    client->bs = NULL;
354
}
355

    
356
int nbd_client_session_init(NbdClientSession *client, BlockDriverState *bs,
357
    int sock, const char *export)
358
{
359
    int ret;
360

    
361
    /* NBD handshake */
362
    logout("session init %s\n", export);
363
    qemu_set_block(sock);
364
    ret = nbd_receive_negotiate(sock, export,
365
                                &client->nbdflags, &client->size,
366
                                &client->blocksize);
367
    if (ret < 0) {
368
        logout("Failed to negotiate with the NBD server\n");
369
        closesocket(sock);
370
        return ret;
371
    }
372

    
373
    qemu_co_mutex_init(&client->send_mutex);
374
    qemu_co_mutex_init(&client->free_sema);
375
    client->bs = bs;
376
    client->sock = sock;
377

    
378
    /* Now that we're connected, set the socket to be non-blocking and
379
     * kick the reply mechanism.  */
380
    qemu_set_nonblock(sock);
381
    qemu_aio_set_fd_handler(sock, nbd_reply_ready, NULL, client);
382

    
383
    logout("Established connection with NBD server\n");
384
    return 0;
385
}