Statistics
| Branch: | Revision:

root / block / sheepdog.c @ d5124c00

History | View | Annotate | Download (64 kB)

1
/*
2
 * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
3
 *
4
 * This program is free software; you can redistribute it and/or
5
 * modify it under the terms of the GNU General Public License version
6
 * 2 as published by the Free Software Foundation.
7
 *
8
 * You should have received a copy of the GNU General Public License
9
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
10
 *
11
 * Contributions after 2012-01-13 are licensed under the terms of the
12
 * GNU GPL, version 2 or (at your option) any later version.
13
 */
14

    
15
#include "qemu-common.h"
16
#include "qemu/uri.h"
17
#include "qemu/error-report.h"
18
#include "qemu/sockets.h"
19
#include "block/block_int.h"
20
#include "qemu/bitops.h"
21

    
22
#define SD_PROTO_VER 0x01
23

    
24
#define SD_DEFAULT_ADDR "localhost"
25
#define SD_DEFAULT_PORT 7000
26

    
27
#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
28
#define SD_OP_READ_OBJ       0x02
29
#define SD_OP_WRITE_OBJ      0x03
30
/* 0x04 is used internally by Sheepdog */
31
#define SD_OP_DISCARD_OBJ    0x05
32

    
33
#define SD_OP_NEW_VDI        0x11
34
#define SD_OP_LOCK_VDI       0x12
35
#define SD_OP_RELEASE_VDI    0x13
36
#define SD_OP_GET_VDI_INFO   0x14
37
#define SD_OP_READ_VDIS      0x15
38
#define SD_OP_FLUSH_VDI      0x16
39
#define SD_OP_DEL_VDI        0x17
40

    
41
#define SD_FLAG_CMD_WRITE    0x01
42
#define SD_FLAG_CMD_COW      0x02
43
#define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
44
#define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
45

    
46
#define SD_RES_SUCCESS       0x00 /* Success */
47
#define SD_RES_UNKNOWN       0x01 /* Unknown error */
48
#define SD_RES_NO_OBJ        0x02 /* No object found */
49
#define SD_RES_EIO           0x03 /* I/O error */
50
#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
51
#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
52
#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
53
#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
54
#define SD_RES_NO_VDI        0x08 /* No vdi found */
55
#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
56
#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
57
#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
58
#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
59
#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
60
#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
61
#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
62
#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
63
#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
64
#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
65
#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
66
#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
67
#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
68
#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
69
#define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
70
#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
71
#define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
72
#define SD_RES_READONLY      0x1A /* Object is read-only */
73

    
74
/*
75
 * Object ID rules
76
 *
77
 *  0 - 19 (20 bits): data object space
78
 * 20 - 31 (12 bits): reserved data object space
79
 * 32 - 55 (24 bits): vdi object space
80
 * 56 - 59 ( 4 bits): reserved vdi object space
81
 * 60 - 63 ( 4 bits): object type identifier space
82
 */
83

    
84
#define VDI_SPACE_SHIFT   32
85
#define VDI_BIT (UINT64_C(1) << 63)
86
#define VMSTATE_BIT (UINT64_C(1) << 62)
87
#define MAX_DATA_OBJS (UINT64_C(1) << 20)
88
#define MAX_CHILDREN 1024
89
#define SD_MAX_VDI_LEN 256
90
#define SD_MAX_VDI_TAG_LEN 256
91
#define SD_NR_VDIS   (1U << 24)
92
#define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
93
#define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
94

    
95
#define SD_INODE_SIZE (sizeof(SheepdogInode))
96
#define CURRENT_VDI_ID 0
97

    
98
typedef struct SheepdogReq {
99
    uint8_t proto_ver;
100
    uint8_t opcode;
101
    uint16_t flags;
102
    uint32_t epoch;
103
    uint32_t id;
104
    uint32_t data_length;
105
    uint32_t opcode_specific[8];
106
} SheepdogReq;
107

    
108
typedef struct SheepdogRsp {
109
    uint8_t proto_ver;
110
    uint8_t opcode;
111
    uint16_t flags;
112
    uint32_t epoch;
113
    uint32_t id;
114
    uint32_t data_length;
115
    uint32_t result;
116
    uint32_t opcode_specific[7];
117
} SheepdogRsp;
118

    
119
typedef struct SheepdogObjReq {
120
    uint8_t proto_ver;
121
    uint8_t opcode;
122
    uint16_t flags;
123
    uint32_t epoch;
124
    uint32_t id;
125
    uint32_t data_length;
126
    uint64_t oid;
127
    uint64_t cow_oid;
128
    uint32_t copies;
129
    uint32_t rsvd;
130
    uint64_t offset;
131
} SheepdogObjReq;
132

    
133
typedef struct SheepdogObjRsp {
134
    uint8_t proto_ver;
135
    uint8_t opcode;
136
    uint16_t flags;
137
    uint32_t epoch;
138
    uint32_t id;
139
    uint32_t data_length;
140
    uint32_t result;
141
    uint32_t copies;
142
    uint32_t pad[6];
143
} SheepdogObjRsp;
144

    
145
typedef struct SheepdogVdiReq {
146
    uint8_t proto_ver;
147
    uint8_t opcode;
148
    uint16_t flags;
149
    uint32_t epoch;
150
    uint32_t id;
151
    uint32_t data_length;
152
    uint64_t vdi_size;
153
    uint32_t vdi_id;
154
    uint32_t copies;
155
    uint32_t snapid;
156
    uint32_t pad[3];
157
} SheepdogVdiReq;
158

    
159
typedef struct SheepdogVdiRsp {
160
    uint8_t proto_ver;
161
    uint8_t opcode;
162
    uint16_t flags;
163
    uint32_t epoch;
164
    uint32_t id;
165
    uint32_t data_length;
166
    uint32_t result;
167
    uint32_t rsvd;
168
    uint32_t vdi_id;
169
    uint32_t pad[5];
170
} SheepdogVdiRsp;
171

    
172
typedef struct SheepdogInode {
173
    char name[SD_MAX_VDI_LEN];
174
    char tag[SD_MAX_VDI_TAG_LEN];
175
    uint64_t ctime;
176
    uint64_t snap_ctime;
177
    uint64_t vm_clock_nsec;
178
    uint64_t vdi_size;
179
    uint64_t vm_state_size;
180
    uint16_t copy_policy;
181
    uint8_t nr_copies;
182
    uint8_t block_size_shift;
183
    uint32_t snap_id;
184
    uint32_t vdi_id;
185
    uint32_t parent_vdi_id;
186
    uint32_t child_vdi_id[MAX_CHILDREN];
187
    uint32_t data_vdi_id[MAX_DATA_OBJS];
188
} SheepdogInode;
189

    
190
/*
191
 * 64 bit FNV-1a non-zero initial basis
192
 */
193
#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
194

    
195
/*
196
 * 64 bit Fowler/Noll/Vo FNV-1a hash code
197
 */
198
static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
199
{
200
    unsigned char *bp = buf;
201
    unsigned char *be = bp + len;
202
    while (bp < be) {
203
        hval ^= (uint64_t) *bp++;
204
        hval += (hval << 1) + (hval << 4) + (hval << 5) +
205
            (hval << 7) + (hval << 8) + (hval << 40);
206
    }
207
    return hval;
208
}
209

    
210
static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
211
{
212
    return inode->vdi_id == inode->data_vdi_id[idx];
213
}
214

    
215
static inline bool is_data_obj(uint64_t oid)
216
{
217
    return !(VDI_BIT & oid);
218
}
219

    
220
static inline uint64_t data_oid_to_idx(uint64_t oid)
221
{
222
    return oid & (MAX_DATA_OBJS - 1);
223
}
224

    
225
static inline uint64_t vid_to_vdi_oid(uint32_t vid)
226
{
227
    return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
228
}
229

    
230
static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
231
{
232
    return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
233
}
234

    
235
static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
236
{
237
    return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
238
}
239

    
240
static inline bool is_snapshot(struct SheepdogInode *inode)
241
{
242
    return !!inode->snap_ctime;
243
}
244

    
245
#undef DPRINTF
246
#ifdef DEBUG_SDOG
247
#define DPRINTF(fmt, args...)                                       \
248
    do {                                                            \
249
        fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args); \
250
    } while (0)
251
#else
252
#define DPRINTF(fmt, args...)
253
#endif
254

    
255
typedef struct SheepdogAIOCB SheepdogAIOCB;
256

    
257
typedef struct AIOReq {
258
    SheepdogAIOCB *aiocb;
259
    unsigned int iov_offset;
260

    
261
    uint64_t oid;
262
    uint64_t base_oid;
263
    uint64_t offset;
264
    unsigned int data_len;
265
    uint8_t flags;
266
    uint32_t id;
267

    
268
    QLIST_ENTRY(AIOReq) aio_siblings;
269
} AIOReq;
270

    
271
enum AIOCBState {
272
    AIOCB_WRITE_UDATA,
273
    AIOCB_READ_UDATA,
274
    AIOCB_FLUSH_CACHE,
275
    AIOCB_DISCARD_OBJ,
276
};
277

    
278
struct SheepdogAIOCB {
279
    BlockDriverAIOCB common;
280

    
281
    QEMUIOVector *qiov;
282

    
283
    int64_t sector_num;
284
    int nb_sectors;
285

    
286
    int ret;
287
    enum AIOCBState aiocb_type;
288

    
289
    Coroutine *coroutine;
290
    void (*aio_done_func)(SheepdogAIOCB *);
291

    
292
    bool canceled;
293
    int nr_pending;
294
};
295

    
296
typedef struct BDRVSheepdogState {
297
    SheepdogInode inode;
298

    
299
    uint32_t min_dirty_data_idx;
300
    uint32_t max_dirty_data_idx;
301

    
302
    char name[SD_MAX_VDI_LEN];
303
    bool is_snapshot;
304
    uint32_t cache_flags;
305
    bool discard_supported;
306

    
307
    char *host_spec;
308
    bool is_unix;
309
    int fd;
310

    
311
    CoMutex lock;
312
    Coroutine *co_send;
313
    Coroutine *co_recv;
314

    
315
    uint32_t aioreq_seq_num;
316
    QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
317
    QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
318
} BDRVSheepdogState;
319

    
320
static const char * sd_strerror(int err)
321
{
322
    int i;
323

    
324
    static const struct {
325
        int err;
326
        const char *desc;
327
    } errors[] = {
328
        {SD_RES_SUCCESS, "Success"},
329
        {SD_RES_UNKNOWN, "Unknown error"},
330
        {SD_RES_NO_OBJ, "No object found"},
331
        {SD_RES_EIO, "I/O error"},
332
        {SD_RES_VDI_EXIST, "VDI exists already"},
333
        {SD_RES_INVALID_PARMS, "Invalid parameters"},
334
        {SD_RES_SYSTEM_ERROR, "System error"},
335
        {SD_RES_VDI_LOCKED, "VDI is already locked"},
336
        {SD_RES_NO_VDI, "No vdi found"},
337
        {SD_RES_NO_BASE_VDI, "No base VDI found"},
338
        {SD_RES_VDI_READ, "Failed read the requested VDI"},
339
        {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
340
        {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
341
        {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
342
        {SD_RES_NO_TAG, "Failed to find the requested tag"},
343
        {SD_RES_STARTUP, "The system is still booting"},
344
        {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
345
        {SD_RES_SHUTDOWN, "The system is shutting down"},
346
        {SD_RES_NO_MEM, "Out of memory on the server"},
347
        {SD_RES_FULL_VDI, "We already have the maximum vdis"},
348
        {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
349
        {SD_RES_NO_SPACE, "Server has no space for new objects"},
350
        {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
351
        {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
352
        {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
353
        {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
354
        {SD_RES_READONLY, "Object is read-only"},
355
    };
356

    
357
    for (i = 0; i < ARRAY_SIZE(errors); ++i) {
358
        if (errors[i].err == err) {
359
            return errors[i].desc;
360
        }
361
    }
362

    
363
    return "Invalid error code";
364
}
365

    
366
/*
367
 * Sheepdog I/O handling:
368
 *
369
 * 1. In sd_co_rw_vector, we send the I/O requests to the server and
370
 *    link the requests to the inflight_list in the
371
 *    BDRVSheepdogState.  The function exits without waiting for
372
 *    receiving the response.
373
 *
374
 * 2. We receive the response in aio_read_response, the fd handler to
375
 *    the sheepdog connection.  If metadata update is needed, we send
376
 *    the write request to the vdi object in sd_write_done, the write
377
 *    completion function.  We switch back to sd_co_readv/writev after
378
 *    all the requests belonging to the AIOCB are finished.
379
 */
380

    
381
static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
382
                                    uint64_t oid, unsigned int data_len,
383
                                    uint64_t offset, uint8_t flags,
384
                                    uint64_t base_oid, unsigned int iov_offset)
385
{
386
    AIOReq *aio_req;
387

    
388
    aio_req = g_malloc(sizeof(*aio_req));
389
    aio_req->aiocb = acb;
390
    aio_req->iov_offset = iov_offset;
391
    aio_req->oid = oid;
392
    aio_req->base_oid = base_oid;
393
    aio_req->offset = offset;
394
    aio_req->data_len = data_len;
395
    aio_req->flags = flags;
396
    aio_req->id = s->aioreq_seq_num++;
397

    
398
    acb->nr_pending++;
399
    return aio_req;
400
}
401

    
402
static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
403
{
404
    SheepdogAIOCB *acb = aio_req->aiocb;
405

    
406
    QLIST_REMOVE(aio_req, aio_siblings);
407
    g_free(aio_req);
408

    
409
    acb->nr_pending--;
410
}
411

    
412
static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
413
{
414
    if (!acb->canceled) {
415
        qemu_coroutine_enter(acb->coroutine, NULL);
416
    }
417
    qemu_aio_release(acb);
418
}
419

    
420
static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
421
{
422
    SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
423

    
424
    /*
425
     * Sheepdog cannot cancel the requests which are already sent to
426
     * the servers, so we just complete the request with -EIO here.
427
     */
428
    acb->ret = -EIO;
429
    qemu_coroutine_enter(acb->coroutine, NULL);
430
    acb->canceled = true;
431
}
432

    
433
static const AIOCBInfo sd_aiocb_info = {
434
    .aiocb_size = sizeof(SheepdogAIOCB),
435
    .cancel = sd_aio_cancel,
436
};
437

    
438
static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
439
                                   int64_t sector_num, int nb_sectors)
440
{
441
    SheepdogAIOCB *acb;
442

    
443
    acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL);
444

    
445
    acb->qiov = qiov;
446

    
447
    acb->sector_num = sector_num;
448
    acb->nb_sectors = nb_sectors;
449

    
450
    acb->aio_done_func = NULL;
451
    acb->canceled = false;
452
    acb->coroutine = qemu_coroutine_self();
453
    acb->ret = 0;
454
    acb->nr_pending = 0;
455
    return acb;
456
}
457

    
458
static int connect_to_sdog(BDRVSheepdogState *s)
459
{
460
    int fd;
461
    Error *err = NULL;
462

    
463
    if (s->is_unix) {
464
        fd = unix_connect(s->host_spec, &err);
465
    } else {
466
        fd = inet_connect(s->host_spec, &err);
467

    
468
        if (err == NULL) {
469
            int ret = socket_set_nodelay(fd);
470
            if (ret < 0) {
471
                error_report("%s", strerror(errno));
472
            }
473
        }
474
    }
475

    
476
    if (err != NULL) {
477
        qerror_report_err(err);
478
        error_free(err);
479
    } else {
480
        qemu_set_nonblock(fd);
481
    }
482

    
483
    return fd;
484
}
485

    
486
static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
487
                                    unsigned int *wlen)
488
{
489
    int ret;
490

    
491
    ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
492
    if (ret < sizeof(*hdr)) {
493
        error_report("failed to send a req, %s", strerror(errno));
494
        return ret;
495
    }
496

    
497
    ret = qemu_co_send(sockfd, data, *wlen);
498
    if (ret < *wlen) {
499
        error_report("failed to send a req, %s", strerror(errno));
500
    }
501

    
502
    return ret;
503
}
504

    
505
static void restart_co_req(void *opaque)
506
{
507
    Coroutine *co = opaque;
508

    
509
    qemu_coroutine_enter(co, NULL);
510
}
511

    
512
typedef struct SheepdogReqCo {
513
    int sockfd;
514
    SheepdogReq *hdr;
515
    void *data;
516
    unsigned int *wlen;
517
    unsigned int *rlen;
518
    int ret;
519
    bool finished;
520
} SheepdogReqCo;
521

    
522
static coroutine_fn void do_co_req(void *opaque)
523
{
524
    int ret;
525
    Coroutine *co;
526
    SheepdogReqCo *srco = opaque;
527
    int sockfd = srco->sockfd;
528
    SheepdogReq *hdr = srco->hdr;
529
    void *data = srco->data;
530
    unsigned int *wlen = srco->wlen;
531
    unsigned int *rlen = srco->rlen;
532

    
533
    co = qemu_coroutine_self();
534
    qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, co);
535

    
536
    ret = send_co_req(sockfd, hdr, data, wlen);
537
    if (ret < 0) {
538
        goto out;
539
    }
540

    
541
    qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, co);
542

    
543
    ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
544
    if (ret < sizeof(*hdr)) {
545
        error_report("failed to get a rsp, %s", strerror(errno));
546
        ret = -errno;
547
        goto out;
548
    }
549

    
550
    if (*rlen > hdr->data_length) {
551
        *rlen = hdr->data_length;
552
    }
553

    
554
    if (*rlen) {
555
        ret = qemu_co_recv(sockfd, data, *rlen);
556
        if (ret < *rlen) {
557
            error_report("failed to get the data, %s", strerror(errno));
558
            ret = -errno;
559
            goto out;
560
        }
561
    }
562
    ret = 0;
563
out:
564
    /* there is at most one request for this sockfd, so it is safe to
565
     * set each handler to NULL. */
566
    qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL);
567

    
568
    srco->ret = ret;
569
    srco->finished = true;
570
}
571

    
572
static int do_req(int sockfd, SheepdogReq *hdr, void *data,
573
                  unsigned int *wlen, unsigned int *rlen)
574
{
575
    Coroutine *co;
576
    SheepdogReqCo srco = {
577
        .sockfd = sockfd,
578
        .hdr = hdr,
579
        .data = data,
580
        .wlen = wlen,
581
        .rlen = rlen,
582
        .ret = 0,
583
        .finished = false,
584
    };
585

    
586
    if (qemu_in_coroutine()) {
587
        do_co_req(&srco);
588
    } else {
589
        co = qemu_coroutine_create(do_co_req);
590
        qemu_coroutine_enter(co, &srco);
591
        while (!srco.finished) {
592
            qemu_aio_wait();
593
        }
594
    }
595

    
596
    return srco.ret;
597
}
598

    
599
static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
600
                           struct iovec *iov, int niov, bool create,
601
                           enum AIOCBState aiocb_type);
602
static int coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
603

    
604

    
605
static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
606
{
607
    AIOReq *aio_req;
608

    
609
    QLIST_FOREACH(aio_req, &s->pending_aio_head, aio_siblings) {
610
        if (aio_req->oid == oid) {
611
            return aio_req;
612
        }
613
    }
614

    
615
    return NULL;
616
}
617

    
618
/*
619
 * This function searchs pending requests to the object `oid', and
620
 * sends them.
621
 */
622
static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
623
{
624
    AIOReq *aio_req;
625
    SheepdogAIOCB *acb;
626
    int ret;
627

    
628
    while ((aio_req = find_pending_req(s, oid)) != NULL) {
629
        acb = aio_req->aiocb;
630
        /* move aio_req from pending list to inflight one */
631
        QLIST_REMOVE(aio_req, aio_siblings);
632
        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
633
        ret = add_aio_request(s, aio_req, acb->qiov->iov,
634
                              acb->qiov->niov, false, acb->aiocb_type);
635
        if (ret < 0) {
636
            error_report("add_aio_request is failed");
637
            free_aio_req(s, aio_req);
638
            if (!acb->nr_pending) {
639
                sd_finish_aiocb(acb);
640
            }
641
        }
642
    }
643
}
644

    
645
/*
646
 * Receive responses of the I/O requests.
647
 *
648
 * This function is registered as a fd handler, and called from the
649
 * main loop when s->fd is ready for reading responses.
650
 */
651
static void coroutine_fn aio_read_response(void *opaque)
652
{
653
    SheepdogObjRsp rsp;
654
    BDRVSheepdogState *s = opaque;
655
    int fd = s->fd;
656
    int ret;
657
    AIOReq *aio_req = NULL;
658
    SheepdogAIOCB *acb;
659
    uint64_t idx;
660

    
661
    if (QLIST_EMPTY(&s->inflight_aio_head)) {
662
        goto out;
663
    }
664

    
665
    /* read a header */
666
    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
667
    if (ret < 0) {
668
        error_report("failed to get the header, %s", strerror(errno));
669
        goto out;
670
    }
671

    
672
    /* find the right aio_req from the inflight aio list */
673
    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
674
        if (aio_req->id == rsp.id) {
675
            break;
676
        }
677
    }
678
    if (!aio_req) {
679
        error_report("cannot find aio_req %x", rsp.id);
680
        goto out;
681
    }
682

    
683
    acb = aio_req->aiocb;
684

    
685
    switch (acb->aiocb_type) {
686
    case AIOCB_WRITE_UDATA:
687
        /* this coroutine context is no longer suitable for co_recv
688
         * because we may send data to update vdi objects */
689
        s->co_recv = NULL;
690
        if (!is_data_obj(aio_req->oid)) {
691
            break;
692
        }
693
        idx = data_oid_to_idx(aio_req->oid);
694

    
695
        if (s->inode.data_vdi_id[idx] != s->inode.vdi_id) {
696
            /*
697
             * If the object is newly created one, we need to update
698
             * the vdi object (metadata object).  min_dirty_data_idx
699
             * and max_dirty_data_idx are changed to include updated
700
             * index between them.
701
             */
702
            if (rsp.result == SD_RES_SUCCESS) {
703
                s->inode.data_vdi_id[idx] = s->inode.vdi_id;
704
                s->max_dirty_data_idx = MAX(idx, s->max_dirty_data_idx);
705
                s->min_dirty_data_idx = MIN(idx, s->min_dirty_data_idx);
706
            }
707
            /*
708
             * Some requests may be blocked because simultaneous
709
             * create requests are not allowed, so we search the
710
             * pending requests here.
711
             */
712
            send_pending_req(s, aio_req->oid);
713
        }
714
        break;
715
    case AIOCB_READ_UDATA:
716
        ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
717
                            aio_req->iov_offset, rsp.data_length);
718
        if (ret < 0) {
719
            error_report("failed to get the data, %s", strerror(errno));
720
            goto out;
721
        }
722
        break;
723
    case AIOCB_FLUSH_CACHE:
724
        if (rsp.result == SD_RES_INVALID_PARMS) {
725
            DPRINTF("disable cache since the server doesn't support it\n");
726
            s->cache_flags = SD_FLAG_CMD_DIRECT;
727
            rsp.result = SD_RES_SUCCESS;
728
        }
729
        break;
730
    case AIOCB_DISCARD_OBJ:
731
        switch (rsp.result) {
732
        case SD_RES_INVALID_PARMS:
733
            error_report("sheep(%s) doesn't support discard command",
734
                         s->host_spec);
735
            rsp.result = SD_RES_SUCCESS;
736
            s->discard_supported = false;
737
            break;
738
        case SD_RES_SUCCESS:
739
            idx = data_oid_to_idx(aio_req->oid);
740
            s->inode.data_vdi_id[idx] = 0;
741
            break;
742
        default:
743
            break;
744
        }
745
    }
746

    
747
    switch (rsp.result) {
748
    case SD_RES_SUCCESS:
749
        break;
750
    case SD_RES_READONLY:
751
        ret = resend_aioreq(s, aio_req);
752
        if (ret == SD_RES_SUCCESS) {
753
            goto out;
754
        }
755
        /* fall through */
756
    default:
757
        acb->ret = -EIO;
758
        error_report("%s", sd_strerror(rsp.result));
759
        break;
760
    }
761

    
762
    free_aio_req(s, aio_req);
763
    if (!acb->nr_pending) {
764
        /*
765
         * We've finished all requests which belong to the AIOCB, so
766
         * we can switch back to sd_co_readv/writev now.
767
         */
768
        acb->aio_done_func(acb);
769
    }
770
out:
771
    s->co_recv = NULL;
772
}
773

    
774
static void co_read_response(void *opaque)
775
{
776
    BDRVSheepdogState *s = opaque;
777

    
778
    if (!s->co_recv) {
779
        s->co_recv = qemu_coroutine_create(aio_read_response);
780
    }
781

    
782
    qemu_coroutine_enter(s->co_recv, opaque);
783
}
784

    
785
static void co_write_request(void *opaque)
786
{
787
    BDRVSheepdogState *s = opaque;
788

    
789
    qemu_coroutine_enter(s->co_send, NULL);
790
}
791

    
792
/*
793
 * Return a socket discriptor to read/write objects.
794
 *
795
 * We cannot use this discriptor for other operations because
796
 * the block driver may be on waiting response from the server.
797
 */
798
static int get_sheep_fd(BDRVSheepdogState *s)
799
{
800
    int fd;
801

    
802
    fd = connect_to_sdog(s);
803
    if (fd < 0) {
804
        return fd;
805
    }
806

    
807
    qemu_aio_set_fd_handler(fd, co_read_response, NULL, s);
808
    return fd;
809
}
810

    
811
static int sd_parse_uri(BDRVSheepdogState *s, const char *filename,
812
                        char *vdi, uint32_t *snapid, char *tag)
813
{
814
    URI *uri;
815
    QueryParams *qp = NULL;
816
    int ret = 0;
817

    
818
    uri = uri_parse(filename);
819
    if (!uri) {
820
        return -EINVAL;
821
    }
822

    
823
    /* transport */
824
    if (!strcmp(uri->scheme, "sheepdog")) {
825
        s->is_unix = false;
826
    } else if (!strcmp(uri->scheme, "sheepdog+tcp")) {
827
        s->is_unix = false;
828
    } else if (!strcmp(uri->scheme, "sheepdog+unix")) {
829
        s->is_unix = true;
830
    } else {
831
        ret = -EINVAL;
832
        goto out;
833
    }
834

    
835
    if (uri->path == NULL || !strcmp(uri->path, "/")) {
836
        ret = -EINVAL;
837
        goto out;
838
    }
839
    pstrcpy(vdi, SD_MAX_VDI_LEN, uri->path + 1);
840

    
841
    qp = query_params_parse(uri->query);
842
    if (qp->n > 1 || (s->is_unix && !qp->n) || (!s->is_unix && qp->n)) {
843
        ret = -EINVAL;
844
        goto out;
845
    }
846

    
847
    if (s->is_unix) {
848
        /* sheepdog+unix:///vdiname?socket=path */
849
        if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
850
            ret = -EINVAL;
851
            goto out;
852
        }
853
        s->host_spec = g_strdup(qp->p[0].value);
854
    } else {
855
        /* sheepdog[+tcp]://[host:port]/vdiname */
856
        s->host_spec = g_strdup_printf("%s:%d", uri->server ?: SD_DEFAULT_ADDR,
857
                                       uri->port ?: SD_DEFAULT_PORT);
858
    }
859

    
860
    /* snapshot tag */
861
    if (uri->fragment) {
862
        *snapid = strtoul(uri->fragment, NULL, 10);
863
        if (*snapid == 0) {
864
            pstrcpy(tag, SD_MAX_VDI_TAG_LEN, uri->fragment);
865
        }
866
    } else {
867
        *snapid = CURRENT_VDI_ID; /* search current vdi */
868
    }
869

    
870
out:
871
    if (qp) {
872
        query_params_free(qp);
873
    }
874
    uri_free(uri);
875
    return ret;
876
}
877

    
878
/*
879
 * Parse a filename (old syntax)
880
 *
881
 * filename must be one of the following formats:
882
 *   1. [vdiname]
883
 *   2. [vdiname]:[snapid]
884
 *   3. [vdiname]:[tag]
885
 *   4. [hostname]:[port]:[vdiname]
886
 *   5. [hostname]:[port]:[vdiname]:[snapid]
887
 *   6. [hostname]:[port]:[vdiname]:[tag]
888
 *
889
 * You can boot from the snapshot images by specifying `snapid` or
890
 * `tag'.
891
 *
892
 * You can run VMs outside the Sheepdog cluster by specifying
893
 * `hostname' and `port' (experimental).
894
 */
895
static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
896
                         char *vdi, uint32_t *snapid, char *tag)
897
{
898
    char *p, *q, *uri;
899
    const char *host_spec, *vdi_spec;
900
    int nr_sep, ret;
901

    
902
    strstart(filename, "sheepdog:", (const char **)&filename);
903
    p = q = g_strdup(filename);
904

    
905
    /* count the number of separators */
906
    nr_sep = 0;
907
    while (*p) {
908
        if (*p == ':') {
909
            nr_sep++;
910
        }
911
        p++;
912
    }
913
    p = q;
914

    
915
    /* use the first two tokens as host_spec. */
916
    if (nr_sep >= 2) {
917
        host_spec = p;
918
        p = strchr(p, ':');
919
        p++;
920
        p = strchr(p, ':');
921
        *p++ = '\0';
922
    } else {
923
        host_spec = "";
924
    }
925

    
926
    vdi_spec = p;
927

    
928
    p = strchr(vdi_spec, ':');
929
    if (p) {
930
        *p++ = '#';
931
    }
932

    
933
    uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
934

    
935
    ret = sd_parse_uri(s, uri, vdi, snapid, tag);
936

    
937
    g_free(q);
938
    g_free(uri);
939

    
940
    return ret;
941
}
942

    
943
static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
944
                         uint32_t snapid, const char *tag, uint32_t *vid,
945
                         bool lock)
946
{
947
    int ret, fd;
948
    SheepdogVdiReq hdr;
949
    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
950
    unsigned int wlen, rlen = 0;
951
    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
952

    
953
    fd = connect_to_sdog(s);
954
    if (fd < 0) {
955
        return fd;
956
    }
957

    
958
    /* This pair of strncpy calls ensures that the buffer is zero-filled,
959
     * which is desirable since we'll soon be sending those bytes, and
960
     * don't want the send_req to read uninitialized data.
961
     */
962
    strncpy(buf, filename, SD_MAX_VDI_LEN);
963
    strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
964

    
965
    memset(&hdr, 0, sizeof(hdr));
966
    if (lock) {
967
        hdr.opcode = SD_OP_LOCK_VDI;
968
    } else {
969
        hdr.opcode = SD_OP_GET_VDI_INFO;
970
    }
971
    wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
972
    hdr.proto_ver = SD_PROTO_VER;
973
    hdr.data_length = wlen;
974
    hdr.snapid = snapid;
975
    hdr.flags = SD_FLAG_CMD_WRITE;
976

    
977
    ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
978
    if (ret) {
979
        goto out;
980
    }
981

    
982
    if (rsp->result != SD_RES_SUCCESS) {
983
        error_report("cannot get vdi info, %s, %s %d %s",
984
                     sd_strerror(rsp->result), filename, snapid, tag);
985
        if (rsp->result == SD_RES_NO_VDI) {
986
            ret = -ENOENT;
987
        } else {
988
            ret = -EIO;
989
        }
990
        goto out;
991
    }
992
    *vid = rsp->vdi_id;
993

    
994
    ret = 0;
995
out:
996
    closesocket(fd);
997
    return ret;
998
}
999

    
1000
static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1001
                           struct iovec *iov, int niov, bool create,
1002
                           enum AIOCBState aiocb_type)
1003
{
1004
    int nr_copies = s->inode.nr_copies;
1005
    SheepdogObjReq hdr;
1006
    unsigned int wlen = 0;
1007
    int ret;
1008
    uint64_t oid = aio_req->oid;
1009
    unsigned int datalen = aio_req->data_len;
1010
    uint64_t offset = aio_req->offset;
1011
    uint8_t flags = aio_req->flags;
1012
    uint64_t old_oid = aio_req->base_oid;
1013

    
1014
    if (!nr_copies) {
1015
        error_report("bug");
1016
    }
1017

    
1018
    memset(&hdr, 0, sizeof(hdr));
1019

    
1020
    switch (aiocb_type) {
1021
    case AIOCB_FLUSH_CACHE:
1022
        hdr.opcode = SD_OP_FLUSH_VDI;
1023
        break;
1024
    case AIOCB_READ_UDATA:
1025
        hdr.opcode = SD_OP_READ_OBJ;
1026
        hdr.flags = flags;
1027
        break;
1028
    case AIOCB_WRITE_UDATA:
1029
        if (create) {
1030
            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1031
        } else {
1032
            hdr.opcode = SD_OP_WRITE_OBJ;
1033
        }
1034
        wlen = datalen;
1035
        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1036
        break;
1037
    case AIOCB_DISCARD_OBJ:
1038
        hdr.opcode = SD_OP_DISCARD_OBJ;
1039
        break;
1040
    }
1041

    
1042
    if (s->cache_flags) {
1043
        hdr.flags |= s->cache_flags;
1044
    }
1045

    
1046
    hdr.oid = oid;
1047
    hdr.cow_oid = old_oid;
1048
    hdr.copies = s->inode.nr_copies;
1049

    
1050
    hdr.data_length = datalen;
1051
    hdr.offset = offset;
1052

    
1053
    hdr.id = aio_req->id;
1054

    
1055
    qemu_co_mutex_lock(&s->lock);
1056
    s->co_send = qemu_coroutine_self();
1057
    qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request, s);
1058
    socket_set_cork(s->fd, 1);
1059

    
1060
    /* send a header */
1061
    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1062
    if (ret < 0) {
1063
        qemu_co_mutex_unlock(&s->lock);
1064
        error_report("failed to send a req, %s", strerror(errno));
1065
        return -errno;
1066
    }
1067

    
1068
    if (wlen) {
1069
        ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1070
        if (ret < 0) {
1071
            qemu_co_mutex_unlock(&s->lock);
1072
            error_report("failed to send a data, %s", strerror(errno));
1073
            return -errno;
1074
        }
1075
    }
1076

    
1077
    socket_set_cork(s->fd, 0);
1078
    qemu_aio_set_fd_handler(s->fd, co_read_response, NULL, s);
1079
    qemu_co_mutex_unlock(&s->lock);
1080

    
1081
    return 0;
1082
}
1083

    
1084
static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
1085
                             unsigned int datalen, uint64_t offset,
1086
                             bool write, bool create, uint32_t cache_flags)
1087
{
1088
    SheepdogObjReq hdr;
1089
    SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1090
    unsigned int wlen, rlen;
1091
    int ret;
1092

    
1093
    memset(&hdr, 0, sizeof(hdr));
1094

    
1095
    if (write) {
1096
        wlen = datalen;
1097
        rlen = 0;
1098
        hdr.flags = SD_FLAG_CMD_WRITE;
1099
        if (create) {
1100
            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1101
        } else {
1102
            hdr.opcode = SD_OP_WRITE_OBJ;
1103
        }
1104
    } else {
1105
        wlen = 0;
1106
        rlen = datalen;
1107
        hdr.opcode = SD_OP_READ_OBJ;
1108
    }
1109

    
1110
    hdr.flags |= cache_flags;
1111

    
1112
    hdr.oid = oid;
1113
    hdr.data_length = datalen;
1114
    hdr.offset = offset;
1115
    hdr.copies = copies;
1116

    
1117
    ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1118
    if (ret) {
1119
        error_report("failed to send a request to the sheep");
1120
        return ret;
1121
    }
1122

    
1123
    switch (rsp->result) {
1124
    case SD_RES_SUCCESS:
1125
        return 0;
1126
    default:
1127
        error_report("%s", sd_strerror(rsp->result));
1128
        return -EIO;
1129
    }
1130
}
1131

    
1132
static int read_object(int fd, char *buf, uint64_t oid, int copies,
1133
                       unsigned int datalen, uint64_t offset,
1134
                       uint32_t cache_flags)
1135
{
1136
    return read_write_object(fd, buf, oid, copies, datalen, offset, false,
1137
                             false, cache_flags);
1138
}
1139

    
1140
static int write_object(int fd, char *buf, uint64_t oid, int copies,
1141
                        unsigned int datalen, uint64_t offset, bool create,
1142
                        uint32_t cache_flags)
1143
{
1144
    return read_write_object(fd, buf, oid, copies, datalen, offset, true,
1145
                             create, cache_flags);
1146
}
1147

    
1148
/* update inode with the latest state */
1149
static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1150
{
1151
    SheepdogInode *inode;
1152
    int ret = 0, fd;
1153
    uint32_t vid = 0;
1154

    
1155
    fd = connect_to_sdog(s);
1156
    if (fd < 0) {
1157
        return -EIO;
1158
    }
1159

    
1160
    inode = g_malloc(sizeof(s->inode));
1161

    
1162
    ret = find_vdi_name(s, s->name, snapid, tag, &vid, false);
1163
    if (ret) {
1164
        goto out;
1165
    }
1166

    
1167
    ret = read_object(fd, (char *)inode, vid_to_vdi_oid(vid),
1168
                      s->inode.nr_copies, sizeof(*inode), 0, s->cache_flags);
1169
    if (ret < 0) {
1170
        goto out;
1171
    }
1172

    
1173
    if (inode->vdi_id != s->inode.vdi_id) {
1174
        memcpy(&s->inode, inode, sizeof(s->inode));
1175
    }
1176

    
1177
out:
1178
    g_free(inode);
1179
    closesocket(fd);
1180

    
1181
    return ret;
1182
}
1183

    
1184
static int coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1185
{
1186
    SheepdogAIOCB *acb = aio_req->aiocb;
1187
    bool create = false;
1188
    int ret;
1189

    
1190
    ret = reload_inode(s, 0, "");
1191
    if (ret < 0) {
1192
        return ret;
1193
    }
1194

    
1195
    aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
1196
                                   data_oid_to_idx(aio_req->oid));
1197

    
1198
    /* check whether this request becomes a CoW one */
1199
    if (acb->aiocb_type == AIOCB_WRITE_UDATA) {
1200
        int idx = data_oid_to_idx(aio_req->oid);
1201
        AIOReq *areq;
1202

    
1203
        if (s->inode.data_vdi_id[idx] == 0) {
1204
            create = true;
1205
            goto out;
1206
        }
1207
        if (is_data_obj_writable(&s->inode, idx)) {
1208
            goto out;
1209
        }
1210

    
1211
        /* link to the pending list if there is another CoW request to
1212
         * the same object */
1213
        QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
1214
            if (areq != aio_req && areq->oid == aio_req->oid) {
1215
                DPRINTF("simultaneous CoW to %" PRIx64 "\n", aio_req->oid);
1216
                QLIST_REMOVE(aio_req, aio_siblings);
1217
                QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req, aio_siblings);
1218
                return SD_RES_SUCCESS;
1219
            }
1220
        }
1221

    
1222
        aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1223
        aio_req->flags |= SD_FLAG_CMD_COW;
1224
        create = true;
1225
    }
1226
out:
1227
    return add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1228
                           create, acb->aiocb_type);
1229
}
1230

    
1231
/* TODO Convert to fine grained options */
1232
static QemuOptsList runtime_opts = {
1233
    .name = "sheepdog",
1234
    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1235
    .desc = {
1236
        {
1237
            .name = "filename",
1238
            .type = QEMU_OPT_STRING,
1239
            .help = "URL to the sheepdog image",
1240
        },
1241
        { /* end of list */ }
1242
    },
1243
};
1244

    
1245
static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1246
                   Error **errp)
1247
{
1248
    int ret, fd;
1249
    uint32_t vid = 0;
1250
    BDRVSheepdogState *s = bs->opaque;
1251
    char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1252
    uint32_t snapid;
1253
    char *buf = NULL;
1254
    QemuOpts *opts;
1255
    Error *local_err = NULL;
1256
    const char *filename;
1257

    
1258
    opts = qemu_opts_create_nofail(&runtime_opts);
1259
    qemu_opts_absorb_qdict(opts, options, &local_err);
1260
    if (error_is_set(&local_err)) {
1261
        qerror_report_err(local_err);
1262
        error_free(local_err);
1263
        ret = -EINVAL;
1264
        goto out;
1265
    }
1266

    
1267
    filename = qemu_opt_get(opts, "filename");
1268

    
1269
    QLIST_INIT(&s->inflight_aio_head);
1270
    QLIST_INIT(&s->pending_aio_head);
1271
    s->fd = -1;
1272

    
1273
    memset(vdi, 0, sizeof(vdi));
1274
    memset(tag, 0, sizeof(tag));
1275

    
1276
    if (strstr(filename, "://")) {
1277
        ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1278
    } else {
1279
        ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1280
    }
1281
    if (ret < 0) {
1282
        goto out;
1283
    }
1284
    s->fd = get_sheep_fd(s);
1285
    if (s->fd < 0) {
1286
        ret = s->fd;
1287
        goto out;
1288
    }
1289

    
1290
    ret = find_vdi_name(s, vdi, snapid, tag, &vid, true);
1291
    if (ret) {
1292
        goto out;
1293
    }
1294

    
1295
    /*
1296
     * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1297
     * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1298
     */
1299
    s->cache_flags = SD_FLAG_CMD_CACHE;
1300
    if (flags & BDRV_O_NOCACHE) {
1301
        s->cache_flags = SD_FLAG_CMD_DIRECT;
1302
    }
1303
    s->discard_supported = true;
1304

    
1305
    if (snapid || tag[0] != '\0') {
1306
        DPRINTF("%" PRIx32 " snapshot inode was open.\n", vid);
1307
        s->is_snapshot = true;
1308
    }
1309

    
1310
    fd = connect_to_sdog(s);
1311
    if (fd < 0) {
1312
        ret = fd;
1313
        goto out;
1314
    }
1315

    
1316
    buf = g_malloc(SD_INODE_SIZE);
1317
    ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0,
1318
                      s->cache_flags);
1319

    
1320
    closesocket(fd);
1321

    
1322
    if (ret) {
1323
        goto out;
1324
    }
1325

    
1326
    memcpy(&s->inode, buf, sizeof(s->inode));
1327
    s->min_dirty_data_idx = UINT32_MAX;
1328
    s->max_dirty_data_idx = 0;
1329

    
1330
    bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1331
    pstrcpy(s->name, sizeof(s->name), vdi);
1332
    qemu_co_mutex_init(&s->lock);
1333
    qemu_opts_del(opts);
1334
    g_free(buf);
1335
    return 0;
1336
out:
1337
    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL);
1338
    if (s->fd >= 0) {
1339
        closesocket(s->fd);
1340
    }
1341
    qemu_opts_del(opts);
1342
    g_free(buf);
1343
    return ret;
1344
}
1345

    
1346
static int do_sd_create(BDRVSheepdogState *s, char *filename, int64_t vdi_size,
1347
                        uint32_t base_vid, uint32_t *vdi_id, int snapshot)
1348
{
1349
    SheepdogVdiReq hdr;
1350
    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1351
    int fd, ret;
1352
    unsigned int wlen, rlen = 0;
1353
    char buf[SD_MAX_VDI_LEN];
1354

    
1355
    fd = connect_to_sdog(s);
1356
    if (fd < 0) {
1357
        return fd;
1358
    }
1359

    
1360
    /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1361
     * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1362
     */
1363
    memset(buf, 0, sizeof(buf));
1364
    pstrcpy(buf, sizeof(buf), filename);
1365

    
1366
    memset(&hdr, 0, sizeof(hdr));
1367
    hdr.opcode = SD_OP_NEW_VDI;
1368
    hdr.vdi_id = base_vid;
1369

    
1370
    wlen = SD_MAX_VDI_LEN;
1371

    
1372
    hdr.flags = SD_FLAG_CMD_WRITE;
1373
    hdr.snapid = snapshot;
1374

    
1375
    hdr.data_length = wlen;
1376
    hdr.vdi_size = vdi_size;
1377

    
1378
    ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1379

    
1380
    closesocket(fd);
1381

    
1382
    if (ret) {
1383
        return ret;
1384
    }
1385

    
1386
    if (rsp->result != SD_RES_SUCCESS) {
1387
        error_report("%s, %s", sd_strerror(rsp->result), filename);
1388
        return -EIO;
1389
    }
1390

    
1391
    if (vdi_id) {
1392
        *vdi_id = rsp->vdi_id;
1393
    }
1394

    
1395
    return 0;
1396
}
1397

    
1398
static int sd_prealloc(const char *filename)
1399
{
1400
    BlockDriverState *bs = NULL;
1401
    uint32_t idx, max_idx;
1402
    int64_t vdi_size;
1403
    void *buf = g_malloc0(SD_DATA_OBJ_SIZE);
1404
    int ret;
1405

    
1406
    ret = bdrv_file_open(&bs, filename, NULL, BDRV_O_RDWR);
1407
    if (ret < 0) {
1408
        goto out;
1409
    }
1410

    
1411
    vdi_size = bdrv_getlength(bs);
1412
    if (vdi_size < 0) {
1413
        ret = vdi_size;
1414
        goto out;
1415
    }
1416
    max_idx = DIV_ROUND_UP(vdi_size, SD_DATA_OBJ_SIZE);
1417

    
1418
    for (idx = 0; idx < max_idx; idx++) {
1419
        /*
1420
         * The created image can be a cloned image, so we need to read
1421
         * a data from the source image.
1422
         */
1423
        ret = bdrv_pread(bs, idx * SD_DATA_OBJ_SIZE, buf, SD_DATA_OBJ_SIZE);
1424
        if (ret < 0) {
1425
            goto out;
1426
        }
1427
        ret = bdrv_pwrite(bs, idx * SD_DATA_OBJ_SIZE, buf, SD_DATA_OBJ_SIZE);
1428
        if (ret < 0) {
1429
            goto out;
1430
        }
1431
    }
1432
out:
1433
    if (bs) {
1434
        bdrv_unref(bs);
1435
    }
1436
    g_free(buf);
1437

    
1438
    return ret;
1439
}
1440

    
1441
static int sd_create(const char *filename, QEMUOptionParameter *options,
1442
                     Error **errp)
1443
{
1444
    int ret = 0;
1445
    uint32_t vid = 0, base_vid = 0;
1446
    int64_t vdi_size = 0;
1447
    char *backing_file = NULL;
1448
    BDRVSheepdogState *s;
1449
    char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1450
    uint32_t snapid;
1451
    bool prealloc = false;
1452

    
1453
    s = g_malloc0(sizeof(BDRVSheepdogState));
1454

    
1455
    memset(vdi, 0, sizeof(vdi));
1456
    memset(tag, 0, sizeof(tag));
1457
    if (strstr(filename, "://")) {
1458
        ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1459
    } else {
1460
        ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1461
    }
1462
    if (ret < 0) {
1463
        goto out;
1464
    }
1465

    
1466
    while (options && options->name) {
1467
        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
1468
            vdi_size = options->value.n;
1469
        } else if (!strcmp(options->name, BLOCK_OPT_BACKING_FILE)) {
1470
            backing_file = options->value.s;
1471
        } else if (!strcmp(options->name, BLOCK_OPT_PREALLOC)) {
1472
            if (!options->value.s || !strcmp(options->value.s, "off")) {
1473
                prealloc = false;
1474
            } else if (!strcmp(options->value.s, "full")) {
1475
                prealloc = true;
1476
            } else {
1477
                error_report("Invalid preallocation mode: '%s'",
1478
                             options->value.s);
1479
                ret = -EINVAL;
1480
                goto out;
1481
            }
1482
        }
1483
        options++;
1484
    }
1485

    
1486
    if (vdi_size > SD_MAX_VDI_SIZE) {
1487
        error_report("too big image size");
1488
        ret = -EINVAL;
1489
        goto out;
1490
    }
1491

    
1492
    if (backing_file) {
1493
        BlockDriverState *bs;
1494
        BDRVSheepdogState *s;
1495
        BlockDriver *drv;
1496

    
1497
        /* Currently, only Sheepdog backing image is supported. */
1498
        drv = bdrv_find_protocol(backing_file, true);
1499
        if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
1500
            error_report("backing_file must be a sheepdog image");
1501
            ret = -EINVAL;
1502
            goto out;
1503
        }
1504

    
1505
        ret = bdrv_file_open(&bs, backing_file, NULL, 0);
1506
        if (ret < 0) {
1507
            goto out;
1508
        }
1509

    
1510
        s = bs->opaque;
1511

    
1512
        if (!is_snapshot(&s->inode)) {
1513
            error_report("cannot clone from a non snapshot vdi");
1514
            bdrv_unref(bs);
1515
            ret = -EINVAL;
1516
            goto out;
1517
        }
1518

    
1519
        base_vid = s->inode.vdi_id;
1520
        bdrv_unref(bs);
1521
    }
1522

    
1523
    ret = do_sd_create(s, vdi, vdi_size, base_vid, &vid, 0);
1524
    if (!prealloc || ret) {
1525
        goto out;
1526
    }
1527

    
1528
    ret = sd_prealloc(filename);
1529
out:
1530
    g_free(s);
1531
    return ret;
1532
}
1533

    
1534
static void sd_close(BlockDriverState *bs)
1535
{
1536
    BDRVSheepdogState *s = bs->opaque;
1537
    SheepdogVdiReq hdr;
1538
    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1539
    unsigned int wlen, rlen = 0;
1540
    int fd, ret;
1541

    
1542
    DPRINTF("%s\n", s->name);
1543

    
1544
    fd = connect_to_sdog(s);
1545
    if (fd < 0) {
1546
        return;
1547
    }
1548

    
1549
    memset(&hdr, 0, sizeof(hdr));
1550

    
1551
    hdr.opcode = SD_OP_RELEASE_VDI;
1552
    hdr.vdi_id = s->inode.vdi_id;
1553
    wlen = strlen(s->name) + 1;
1554
    hdr.data_length = wlen;
1555
    hdr.flags = SD_FLAG_CMD_WRITE;
1556

    
1557
    ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen);
1558

    
1559
    closesocket(fd);
1560

    
1561
    if (!ret && rsp->result != SD_RES_SUCCESS &&
1562
        rsp->result != SD_RES_VDI_NOT_LOCKED) {
1563
        error_report("%s, %s", sd_strerror(rsp->result), s->name);
1564
    }
1565

    
1566
    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL);
1567
    closesocket(s->fd);
1568
    g_free(s->host_spec);
1569
}
1570

    
1571
static int64_t sd_getlength(BlockDriverState *bs)
1572
{
1573
    BDRVSheepdogState *s = bs->opaque;
1574

    
1575
    return s->inode.vdi_size;
1576
}
1577

    
1578
static int sd_truncate(BlockDriverState *bs, int64_t offset)
1579
{
1580
    BDRVSheepdogState *s = bs->opaque;
1581
    int ret, fd;
1582
    unsigned int datalen;
1583

    
1584
    if (offset < s->inode.vdi_size) {
1585
        error_report("shrinking is not supported");
1586
        return -EINVAL;
1587
    } else if (offset > SD_MAX_VDI_SIZE) {
1588
        error_report("too big image size");
1589
        return -EINVAL;
1590
    }
1591

    
1592
    fd = connect_to_sdog(s);
1593
    if (fd < 0) {
1594
        return fd;
1595
    }
1596

    
1597
    /* we don't need to update entire object */
1598
    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1599
    s->inode.vdi_size = offset;
1600
    ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
1601
                       s->inode.nr_copies, datalen, 0, false, s->cache_flags);
1602
    close(fd);
1603

    
1604
    if (ret < 0) {
1605
        error_report("failed to update an inode.");
1606
    }
1607

    
1608
    return ret;
1609
}
1610

    
1611
/*
1612
 * This function is called after writing data objects.  If we need to
1613
 * update metadata, this sends a write request to the vdi object.
1614
 * Otherwise, this switches back to sd_co_readv/writev.
1615
 */
1616
static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
1617
{
1618
    int ret;
1619
    BDRVSheepdogState *s = acb->common.bs->opaque;
1620
    struct iovec iov;
1621
    AIOReq *aio_req;
1622
    uint32_t offset, data_len, mn, mx;
1623

    
1624
    mn = s->min_dirty_data_idx;
1625
    mx = s->max_dirty_data_idx;
1626
    if (mn <= mx) {
1627
        /* we need to update the vdi object. */
1628
        offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
1629
            mn * sizeof(s->inode.data_vdi_id[0]);
1630
        data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
1631

    
1632
        s->min_dirty_data_idx = UINT32_MAX;
1633
        s->max_dirty_data_idx = 0;
1634

    
1635
        iov.iov_base = &s->inode;
1636
        iov.iov_len = sizeof(s->inode);
1637
        aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
1638
                                data_len, offset, 0, 0, offset);
1639
        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1640
        ret = add_aio_request(s, aio_req, &iov, 1, false, AIOCB_WRITE_UDATA);
1641
        if (ret) {
1642
            free_aio_req(s, aio_req);
1643
            acb->ret = -EIO;
1644
            goto out;
1645
        }
1646

    
1647
        acb->aio_done_func = sd_finish_aiocb;
1648
        acb->aiocb_type = AIOCB_WRITE_UDATA;
1649
        return;
1650
    }
1651
out:
1652
    sd_finish_aiocb(acb);
1653
}
1654

    
1655
/* Delete current working VDI on the snapshot chain */
1656
static bool sd_delete(BDRVSheepdogState *s)
1657
{
1658
    unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
1659
    SheepdogVdiReq hdr = {
1660
        .opcode = SD_OP_DEL_VDI,
1661
        .vdi_id = s->inode.vdi_id,
1662
        .data_length = wlen,
1663
        .flags = SD_FLAG_CMD_WRITE,
1664
    };
1665
    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1666
    int fd, ret;
1667

    
1668
    fd = connect_to_sdog(s);
1669
    if (fd < 0) {
1670
        return false;
1671
    }
1672

    
1673
    ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen);
1674
    closesocket(fd);
1675
    if (ret) {
1676
        return false;
1677
    }
1678
    switch (rsp->result) {
1679
    case SD_RES_NO_VDI:
1680
        error_report("%s was already deleted", s->name);
1681
        /* fall through */
1682
    case SD_RES_SUCCESS:
1683
        break;
1684
    default:
1685
        error_report("%s, %s", sd_strerror(rsp->result), s->name);
1686
        return false;
1687
    }
1688

    
1689
    return true;
1690
}
1691

    
1692
/*
1693
 * Create a writable VDI from a snapshot
1694
 */
1695
static int sd_create_branch(BDRVSheepdogState *s)
1696
{
1697
    int ret, fd;
1698
    uint32_t vid;
1699
    char *buf;
1700
    bool deleted;
1701

    
1702
    DPRINTF("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
1703

    
1704
    buf = g_malloc(SD_INODE_SIZE);
1705

    
1706
    /*
1707
     * Even If deletion fails, we will just create extra snapshot based on
1708
     * the workding VDI which was supposed to be deleted. So no need to
1709
     * false bail out.
1710
     */
1711
    deleted = sd_delete(s);
1712
    ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &vid,
1713
                       !deleted);
1714
    if (ret) {
1715
        goto out;
1716
    }
1717

    
1718
    DPRINTF("%" PRIx32 " is created.\n", vid);
1719

    
1720
    fd = connect_to_sdog(s);
1721
    if (fd < 0) {
1722
        ret = fd;
1723
        goto out;
1724
    }
1725

    
1726
    ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
1727
                      SD_INODE_SIZE, 0, s->cache_flags);
1728

    
1729
    closesocket(fd);
1730

    
1731
    if (ret < 0) {
1732
        goto out;
1733
    }
1734

    
1735
    memcpy(&s->inode, buf, sizeof(s->inode));
1736

    
1737
    s->is_snapshot = false;
1738
    ret = 0;
1739
    DPRINTF("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
1740

    
1741
out:
1742
    g_free(buf);
1743

    
1744
    return ret;
1745
}
1746

    
1747
/*
1748
 * Send I/O requests to the server.
1749
 *
1750
 * This function sends requests to the server, links the requests to
1751
 * the inflight_list in BDRVSheepdogState, and exits without
1752
 * waiting the response.  The responses are received in the
1753
 * `aio_read_response' function which is called from the main loop as
1754
 * a fd handler.
1755
 *
1756
 * Returns 1 when we need to wait a response, 0 when there is no sent
1757
 * request and -errno in error cases.
1758
 */
1759
static int coroutine_fn sd_co_rw_vector(void *p)
1760
{
1761
    SheepdogAIOCB *acb = p;
1762
    int ret = 0;
1763
    unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
1764
    unsigned long idx = acb->sector_num * BDRV_SECTOR_SIZE / SD_DATA_OBJ_SIZE;
1765
    uint64_t oid;
1766
    uint64_t offset = (acb->sector_num * BDRV_SECTOR_SIZE) % SD_DATA_OBJ_SIZE;
1767
    BDRVSheepdogState *s = acb->common.bs->opaque;
1768
    SheepdogInode *inode = &s->inode;
1769
    AIOReq *aio_req;
1770

    
1771
    if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
1772
        /*
1773
         * In the case we open the snapshot VDI, Sheepdog creates the
1774
         * writable VDI when we do a write operation first.
1775
         */
1776
        ret = sd_create_branch(s);
1777
        if (ret) {
1778
            acb->ret = -EIO;
1779
            goto out;
1780
        }
1781
    }
1782

    
1783
    /*
1784
     * Make sure we don't free the aiocb before we are done with all requests.
1785
     * This additional reference is dropped at the end of this function.
1786
     */
1787
    acb->nr_pending++;
1788

    
1789
    while (done != total) {
1790
        uint8_t flags = 0;
1791
        uint64_t old_oid = 0;
1792
        bool create = false;
1793

    
1794
        oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
1795

    
1796
        len = MIN(total - done, SD_DATA_OBJ_SIZE - offset);
1797

    
1798
        switch (acb->aiocb_type) {
1799
        case AIOCB_READ_UDATA:
1800
            if (!inode->data_vdi_id[idx]) {
1801
                qemu_iovec_memset(acb->qiov, done, 0, len);
1802
                goto done;
1803
            }
1804
            break;
1805
        case AIOCB_WRITE_UDATA:
1806
            if (!inode->data_vdi_id[idx]) {
1807
                create = true;
1808
            } else if (!is_data_obj_writable(inode, idx)) {
1809
                /* Copy-On-Write */
1810
                create = true;
1811
                old_oid = oid;
1812
                flags = SD_FLAG_CMD_COW;
1813
            }
1814
            break;
1815
        case AIOCB_DISCARD_OBJ:
1816
            /*
1817
             * We discard the object only when the whole object is
1818
             * 1) allocated 2) trimmed. Otherwise, simply skip it.
1819
             */
1820
            if (len != SD_DATA_OBJ_SIZE || inode->data_vdi_id[idx] == 0) {
1821
                goto done;
1822
            }
1823
            break;
1824
        default:
1825
            break;
1826
        }
1827

    
1828
        if (create) {
1829
            DPRINTF("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
1830
                    inode->vdi_id, oid,
1831
                    vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
1832
            oid = vid_to_data_oid(inode->vdi_id, idx);
1833
            DPRINTF("new oid %" PRIx64 "\n", oid);
1834
        }
1835

    
1836
        aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, done);
1837

    
1838
        if (create) {
1839
            AIOReq *areq;
1840
            QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
1841
                if (areq->oid == oid) {
1842
                    /*
1843
                     * Sheepdog cannot handle simultaneous create
1844
                     * requests to the same object.  So we cannot send
1845
                     * the request until the previous request
1846
                     * finishes.
1847
                     */
1848
                    aio_req->flags = 0;
1849
                    aio_req->base_oid = 0;
1850
                    QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req,
1851
                                      aio_siblings);
1852
                    goto done;
1853
                }
1854
            }
1855
        }
1856

    
1857
        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1858
        ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1859
                              create, acb->aiocb_type);
1860
        if (ret < 0) {
1861
            error_report("add_aio_request is failed");
1862
            free_aio_req(s, aio_req);
1863
            acb->ret = -EIO;
1864
            goto out;
1865
        }
1866
    done:
1867
        offset = 0;
1868
        idx++;
1869
        done += len;
1870
    }
1871
out:
1872
    if (!--acb->nr_pending) {
1873
        return acb->ret;
1874
    }
1875
    return 1;
1876
}
1877

    
1878
static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
1879
                        int nb_sectors, QEMUIOVector *qiov)
1880
{
1881
    SheepdogAIOCB *acb;
1882
    int ret;
1883

    
1884
    if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
1885
        ret = sd_truncate(bs, (sector_num + nb_sectors) * BDRV_SECTOR_SIZE);
1886
        if (ret < 0) {
1887
            return ret;
1888
        }
1889
        bs->total_sectors = sector_num + nb_sectors;
1890
    }
1891

    
1892
    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
1893
    acb->aio_done_func = sd_write_done;
1894
    acb->aiocb_type = AIOCB_WRITE_UDATA;
1895

    
1896
    ret = sd_co_rw_vector(acb);
1897
    if (ret <= 0) {
1898
        qemu_aio_release(acb);
1899
        return ret;
1900
    }
1901

    
1902
    qemu_coroutine_yield();
1903

    
1904
    return acb->ret;
1905
}
1906

    
1907
static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
1908
                       int nb_sectors, QEMUIOVector *qiov)
1909
{
1910
    SheepdogAIOCB *acb;
1911
    int ret;
1912

    
1913
    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
1914
    acb->aiocb_type = AIOCB_READ_UDATA;
1915
    acb->aio_done_func = sd_finish_aiocb;
1916

    
1917
    ret = sd_co_rw_vector(acb);
1918
    if (ret <= 0) {
1919
        qemu_aio_release(acb);
1920
        return ret;
1921
    }
1922

    
1923
    qemu_coroutine_yield();
1924

    
1925
    return acb->ret;
1926
}
1927

    
1928
static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
1929
{
1930
    BDRVSheepdogState *s = bs->opaque;
1931
    SheepdogAIOCB *acb;
1932
    AIOReq *aio_req;
1933
    int ret;
1934

    
1935
    if (s->cache_flags != SD_FLAG_CMD_CACHE) {
1936
        return 0;
1937
    }
1938

    
1939
    acb = sd_aio_setup(bs, NULL, 0, 0);
1940
    acb->aiocb_type = AIOCB_FLUSH_CACHE;
1941
    acb->aio_done_func = sd_finish_aiocb;
1942

    
1943
    aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
1944
                            0, 0, 0, 0, 0);
1945
    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1946
    ret = add_aio_request(s, aio_req, NULL, 0, false, acb->aiocb_type);
1947
    if (ret < 0) {
1948
        error_report("add_aio_request is failed");
1949
        free_aio_req(s, aio_req);
1950
        qemu_aio_release(acb);
1951
        return ret;
1952
    }
1953

    
1954
    qemu_coroutine_yield();
1955
    return acb->ret;
1956
}
1957

    
1958
static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
1959
{
1960
    BDRVSheepdogState *s = bs->opaque;
1961
    int ret, fd;
1962
    uint32_t new_vid;
1963
    SheepdogInode *inode;
1964
    unsigned int datalen;
1965

    
1966
    DPRINTF("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
1967
            "is_snapshot %d\n", sn_info->name, sn_info->id_str,
1968
            s->name, sn_info->vm_state_size, s->is_snapshot);
1969

    
1970
    if (s->is_snapshot) {
1971
        error_report("You can't create a snapshot of a snapshot VDI, "
1972
                     "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
1973

    
1974
        return -EINVAL;
1975
    }
1976

    
1977
    DPRINTF("%s %s\n", sn_info->name, sn_info->id_str);
1978

    
1979
    s->inode.vm_state_size = sn_info->vm_state_size;
1980
    s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
1981
    /* It appears that inode.tag does not require a NUL terminator,
1982
     * which means this use of strncpy is ok.
1983
     */
1984
    strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
1985
    /* we don't need to update entire object */
1986
    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1987

    
1988
    /* refresh inode. */
1989
    fd = connect_to_sdog(s);
1990
    if (fd < 0) {
1991
        ret = fd;
1992
        goto cleanup;
1993
    }
1994

    
1995
    ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
1996
                       s->inode.nr_copies, datalen, 0, false, s->cache_flags);
1997
    if (ret < 0) {
1998
        error_report("failed to write snapshot's inode.");
1999
        goto cleanup;
2000
    }
2001

    
2002
    ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &new_vid,
2003
                       1);
2004
    if (ret < 0) {
2005
        error_report("failed to create inode for snapshot. %s",
2006
                     strerror(errno));
2007
        goto cleanup;
2008
    }
2009

    
2010
    inode = (SheepdogInode *)g_malloc(datalen);
2011

    
2012
    ret = read_object(fd, (char *)inode, vid_to_vdi_oid(new_vid),
2013
                      s->inode.nr_copies, datalen, 0, s->cache_flags);
2014

    
2015
    if (ret < 0) {
2016
        error_report("failed to read new inode info. %s", strerror(errno));
2017
        goto cleanup;
2018
    }
2019

    
2020
    memcpy(&s->inode, inode, datalen);
2021
    DPRINTF("s->inode: name %s snap_id %x oid %x\n",
2022
            s->inode.name, s->inode.snap_id, s->inode.vdi_id);
2023

    
2024
cleanup:
2025
    closesocket(fd);
2026
    return ret;
2027
}
2028

    
2029
/*
2030
 * We implement rollback(loadvm) operation to the specified snapshot by
2031
 * 1) switch to the snapshot
2032
 * 2) rely on sd_create_branch to delete working VDI and
2033
 * 3) create a new working VDI based on the speicified snapshot
2034
 */
2035
static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2036
{
2037
    BDRVSheepdogState *s = bs->opaque;
2038
    BDRVSheepdogState *old_s;
2039
    char tag[SD_MAX_VDI_TAG_LEN];
2040
    uint32_t snapid = 0;
2041
    int ret = 0;
2042

    
2043
    old_s = g_malloc(sizeof(BDRVSheepdogState));
2044

    
2045
    memcpy(old_s, s, sizeof(BDRVSheepdogState));
2046

    
2047
    snapid = strtoul(snapshot_id, NULL, 10);
2048
    if (snapid) {
2049
        tag[0] = 0;
2050
    } else {
2051
        pstrcpy(tag, sizeof(tag), snapshot_id);
2052
    }
2053

    
2054
    ret = reload_inode(s, snapid, tag);
2055
    if (ret) {
2056
        goto out;
2057
    }
2058

    
2059
    ret = sd_create_branch(s);
2060
    if (ret) {
2061
        goto out;
2062
    }
2063

    
2064
    g_free(old_s);
2065

    
2066
    return 0;
2067
out:
2068
    /* recover bdrv_sd_state */
2069
    memcpy(s, old_s, sizeof(BDRVSheepdogState));
2070
    g_free(old_s);
2071

    
2072
    error_report("failed to open. recover old bdrv_sd_state.");
2073

    
2074
    return ret;
2075
}
2076

    
2077
static int sd_snapshot_delete(BlockDriverState *bs,
2078
                              const char *snapshot_id,
2079
                              const char *name,
2080
                              Error **errp)
2081
{
2082
    /* FIXME: Delete specified snapshot id.  */
2083
    return 0;
2084
}
2085

    
2086
static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2087
{
2088
    BDRVSheepdogState *s = bs->opaque;
2089
    SheepdogReq req;
2090
    int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2091
    QEMUSnapshotInfo *sn_tab = NULL;
2092
    unsigned wlen, rlen;
2093
    int found = 0;
2094
    static SheepdogInode inode;
2095
    unsigned long *vdi_inuse;
2096
    unsigned int start_nr;
2097
    uint64_t hval;
2098
    uint32_t vid;
2099

    
2100
    vdi_inuse = g_malloc(max);
2101

    
2102
    fd = connect_to_sdog(s);
2103
    if (fd < 0) {
2104
        ret = fd;
2105
        goto out;
2106
    }
2107

    
2108
    rlen = max;
2109
    wlen = 0;
2110

    
2111
    memset(&req, 0, sizeof(req));
2112

    
2113
    req.opcode = SD_OP_READ_VDIS;
2114
    req.data_length = max;
2115

    
2116
    ret = do_req(fd, (SheepdogReq *)&req, vdi_inuse, &wlen, &rlen);
2117

    
2118
    closesocket(fd);
2119
    if (ret) {
2120
        goto out;
2121
    }
2122

    
2123
    sn_tab = g_malloc0(nr * sizeof(*sn_tab));
2124

    
2125
    /* calculate a vdi id with hash function */
2126
    hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2127
    start_nr = hval & (SD_NR_VDIS - 1);
2128

    
2129
    fd = connect_to_sdog(s);
2130
    if (fd < 0) {
2131
        ret = fd;
2132
        goto out;
2133
    }
2134

    
2135
    for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2136
        if (!test_bit(vid, vdi_inuse)) {
2137
            break;
2138
        }
2139

    
2140
        /* we don't need to read entire object */
2141
        ret = read_object(fd, (char *)&inode, vid_to_vdi_oid(vid),
2142
                          0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
2143
                          s->cache_flags);
2144

    
2145
        if (ret) {
2146
            continue;
2147
        }
2148

    
2149
        if (!strcmp(inode.name, s->name) && is_snapshot(&inode)) {
2150
            sn_tab[found].date_sec = inode.snap_ctime >> 32;
2151
            sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
2152
            sn_tab[found].vm_state_size = inode.vm_state_size;
2153
            sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
2154

    
2155
            snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str), "%u",
2156
                     inode.snap_id);
2157
            pstrcpy(sn_tab[found].name,
2158
                    MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)),
2159
                    inode.tag);
2160
            found++;
2161
        }
2162
    }
2163

    
2164
    closesocket(fd);
2165
out:
2166
    *psn_tab = sn_tab;
2167

    
2168
    g_free(vdi_inuse);
2169

    
2170
    if (ret < 0) {
2171
        return ret;
2172
    }
2173

    
2174
    return found;
2175
}
2176

    
2177
static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
2178
                                int64_t pos, int size, int load)
2179
{
2180
    bool create;
2181
    int fd, ret = 0, remaining = size;
2182
    unsigned int data_len;
2183
    uint64_t vmstate_oid;
2184
    uint64_t offset;
2185
    uint32_t vdi_index;
2186
    uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
2187

    
2188
    fd = connect_to_sdog(s);
2189
    if (fd < 0) {
2190
        return fd;
2191
    }
2192

    
2193
    while (remaining) {
2194
        vdi_index = pos / SD_DATA_OBJ_SIZE;
2195
        offset = pos % SD_DATA_OBJ_SIZE;
2196

    
2197
        data_len = MIN(remaining, SD_DATA_OBJ_SIZE - offset);
2198

    
2199
        vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
2200

    
2201
        create = (offset == 0);
2202
        if (load) {
2203
            ret = read_object(fd, (char *)data, vmstate_oid,
2204
                              s->inode.nr_copies, data_len, offset,
2205
                              s->cache_flags);
2206
        } else {
2207
            ret = write_object(fd, (char *)data, vmstate_oid,
2208
                               s->inode.nr_copies, data_len, offset, create,
2209
                               s->cache_flags);
2210
        }
2211

    
2212
        if (ret < 0) {
2213
            error_report("failed to save vmstate %s", strerror(errno));
2214
            goto cleanup;
2215
        }
2216

    
2217
        pos += data_len;
2218
        data += data_len;
2219
        remaining -= data_len;
2220
    }
2221
    ret = size;
2222
cleanup:
2223
    closesocket(fd);
2224
    return ret;
2225
}
2226

    
2227
static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
2228
                           int64_t pos)
2229
{
2230
    BDRVSheepdogState *s = bs->opaque;
2231
    void *buf;
2232
    int ret;
2233

    
2234
    buf = qemu_blockalign(bs, qiov->size);
2235
    qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
2236
    ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
2237
    qemu_vfree(buf);
2238

    
2239
    return ret;
2240
}
2241

    
2242
static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
2243
                           int64_t pos, int size)
2244
{
2245
    BDRVSheepdogState *s = bs->opaque;
2246

    
2247
    return do_load_save_vmstate(s, data, pos, size, 1);
2248
}
2249

    
2250

    
2251
static coroutine_fn int sd_co_discard(BlockDriverState *bs, int64_t sector_num,
2252
                                      int nb_sectors)
2253
{
2254
    SheepdogAIOCB *acb;
2255
    QEMUIOVector dummy;
2256
    BDRVSheepdogState *s = bs->opaque;
2257
    int ret;
2258

    
2259
    if (!s->discard_supported) {
2260
            return 0;
2261
    }
2262

    
2263
    acb = sd_aio_setup(bs, &dummy, sector_num, nb_sectors);
2264
    acb->aiocb_type = AIOCB_DISCARD_OBJ;
2265
    acb->aio_done_func = sd_finish_aiocb;
2266

    
2267
    ret = sd_co_rw_vector(acb);
2268
    if (ret <= 0) {
2269
        qemu_aio_release(acb);
2270
        return ret;
2271
    }
2272

    
2273
    qemu_coroutine_yield();
2274

    
2275
    return acb->ret;
2276
}
2277

    
2278
static coroutine_fn int64_t
2279
sd_co_get_block_status(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
2280
                       int *pnum)
2281
{
2282
    BDRVSheepdogState *s = bs->opaque;
2283
    SheepdogInode *inode = &s->inode;
2284
    unsigned long start = sector_num * BDRV_SECTOR_SIZE / SD_DATA_OBJ_SIZE,
2285
                  end = DIV_ROUND_UP((sector_num + nb_sectors) *
2286
                                     BDRV_SECTOR_SIZE, SD_DATA_OBJ_SIZE);
2287
    unsigned long idx;
2288
    int64_t ret = BDRV_BLOCK_DATA;
2289

    
2290
    for (idx = start; idx < end; idx++) {
2291
        if (inode->data_vdi_id[idx] == 0) {
2292
            break;
2293
        }
2294
    }
2295
    if (idx == start) {
2296
        /* Get the longest length of unallocated sectors */
2297
        ret = 0;
2298
        for (idx = start + 1; idx < end; idx++) {
2299
            if (inode->data_vdi_id[idx] != 0) {
2300
                break;
2301
            }
2302
        }
2303
    }
2304

    
2305
    *pnum = (idx - start) * SD_DATA_OBJ_SIZE / BDRV_SECTOR_SIZE;
2306
    if (*pnum > nb_sectors) {
2307
        *pnum = nb_sectors;
2308
    }
2309
    return ret;
2310
}
2311

    
2312
static QEMUOptionParameter sd_create_options[] = {
2313
    {
2314
        .name = BLOCK_OPT_SIZE,
2315
        .type = OPT_SIZE,
2316
        .help = "Virtual disk size"
2317
    },
2318
    {
2319
        .name = BLOCK_OPT_BACKING_FILE,
2320
        .type = OPT_STRING,
2321
        .help = "File name of a base image"
2322
    },
2323
    {
2324
        .name = BLOCK_OPT_PREALLOC,
2325
        .type = OPT_STRING,
2326
        .help = "Preallocation mode (allowed values: off, full)"
2327
    },
2328
    { NULL }
2329
};
2330

    
2331
static BlockDriver bdrv_sheepdog = {
2332
    .format_name    = "sheepdog",
2333
    .protocol_name  = "sheepdog",
2334
    .instance_size  = sizeof(BDRVSheepdogState),
2335
    .bdrv_file_open = sd_open,
2336
    .bdrv_close     = sd_close,
2337
    .bdrv_create    = sd_create,
2338
    .bdrv_has_zero_init = bdrv_has_zero_init_1,
2339
    .bdrv_getlength = sd_getlength,
2340
    .bdrv_truncate  = sd_truncate,
2341

    
2342
    .bdrv_co_readv  = sd_co_readv,
2343
    .bdrv_co_writev = sd_co_writev,
2344
    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2345
    .bdrv_co_discard = sd_co_discard,
2346
    .bdrv_co_get_block_status = sd_co_get_block_status,
2347

    
2348
    .bdrv_snapshot_create   = sd_snapshot_create,
2349
    .bdrv_snapshot_goto     = sd_snapshot_goto,
2350
    .bdrv_snapshot_delete   = sd_snapshot_delete,
2351
    .bdrv_snapshot_list     = sd_snapshot_list,
2352

    
2353
    .bdrv_save_vmstate  = sd_save_vmstate,
2354
    .bdrv_load_vmstate  = sd_load_vmstate,
2355

    
2356
    .create_options = sd_create_options,
2357
};
2358

    
2359
static BlockDriver bdrv_sheepdog_tcp = {
2360
    .format_name    = "sheepdog",
2361
    .protocol_name  = "sheepdog+tcp",
2362
    .instance_size  = sizeof(BDRVSheepdogState),
2363
    .bdrv_file_open = sd_open,
2364
    .bdrv_close     = sd_close,
2365
    .bdrv_create    = sd_create,
2366
    .bdrv_has_zero_init = bdrv_has_zero_init_1,
2367
    .bdrv_getlength = sd_getlength,
2368
    .bdrv_truncate  = sd_truncate,
2369

    
2370
    .bdrv_co_readv  = sd_co_readv,
2371
    .bdrv_co_writev = sd_co_writev,
2372
    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2373
    .bdrv_co_discard = sd_co_discard,
2374
    .bdrv_co_get_block_status = sd_co_get_block_status,
2375

    
2376
    .bdrv_snapshot_create   = sd_snapshot_create,
2377
    .bdrv_snapshot_goto     = sd_snapshot_goto,
2378
    .bdrv_snapshot_delete   = sd_snapshot_delete,
2379
    .bdrv_snapshot_list     = sd_snapshot_list,
2380

    
2381
    .bdrv_save_vmstate  = sd_save_vmstate,
2382
    .bdrv_load_vmstate  = sd_load_vmstate,
2383

    
2384
    .create_options = sd_create_options,
2385
};
2386

    
2387
static BlockDriver bdrv_sheepdog_unix = {
2388
    .format_name    = "sheepdog",
2389
    .protocol_name  = "sheepdog+unix",
2390
    .instance_size  = sizeof(BDRVSheepdogState),
2391
    .bdrv_file_open = sd_open,
2392
    .bdrv_close     = sd_close,
2393
    .bdrv_create    = sd_create,
2394
    .bdrv_has_zero_init = bdrv_has_zero_init_1,
2395
    .bdrv_getlength = sd_getlength,
2396
    .bdrv_truncate  = sd_truncate,
2397

    
2398
    .bdrv_co_readv  = sd_co_readv,
2399
    .bdrv_co_writev = sd_co_writev,
2400
    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2401
    .bdrv_co_discard = sd_co_discard,
2402
    .bdrv_co_get_block_status = sd_co_get_block_status,
2403

    
2404
    .bdrv_snapshot_create   = sd_snapshot_create,
2405
    .bdrv_snapshot_goto     = sd_snapshot_goto,
2406
    .bdrv_snapshot_delete   = sd_snapshot_delete,
2407
    .bdrv_snapshot_list     = sd_snapshot_list,
2408

    
2409
    .bdrv_save_vmstate  = sd_save_vmstate,
2410
    .bdrv_load_vmstate  = sd_load_vmstate,
2411

    
2412
    .create_options = sd_create_options,
2413
};
2414

    
2415
static void bdrv_sheepdog_init(void)
2416
{
2417
    bdrv_register(&bdrv_sheepdog);
2418
    bdrv_register(&bdrv_sheepdog_tcp);
2419
    bdrv_register(&bdrv_sheepdog_unix);
2420
}
2421
block_init(bdrv_sheepdog_init);