Statistics
| Branch: | Revision:

root / block / sheepdog.c @ 34b5d2c6

History | View | Annotate | Download (64.2 kB)

1
/*
2
 * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
3
 *
4
 * This program is free software; you can redistribute it and/or
5
 * modify it under the terms of the GNU General Public License version
6
 * 2 as published by the Free Software Foundation.
7
 *
8
 * You should have received a copy of the GNU General Public License
9
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
10
 *
11
 * Contributions after 2012-01-13 are licensed under the terms of the
12
 * GNU GPL, version 2 or (at your option) any later version.
13
 */
14

    
15
#include "qemu-common.h"
16
#include "qemu/uri.h"
17
#include "qemu/error-report.h"
18
#include "qemu/sockets.h"
19
#include "block/block_int.h"
20
#include "qemu/bitops.h"
21

    
22
#define SD_PROTO_VER 0x01
23

    
24
#define SD_DEFAULT_ADDR "localhost"
25
#define SD_DEFAULT_PORT 7000
26

    
27
#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
28
#define SD_OP_READ_OBJ       0x02
29
#define SD_OP_WRITE_OBJ      0x03
30
/* 0x04 is used internally by Sheepdog */
31
#define SD_OP_DISCARD_OBJ    0x05
32

    
33
#define SD_OP_NEW_VDI        0x11
34
#define SD_OP_LOCK_VDI       0x12
35
#define SD_OP_RELEASE_VDI    0x13
36
#define SD_OP_GET_VDI_INFO   0x14
37
#define SD_OP_READ_VDIS      0x15
38
#define SD_OP_FLUSH_VDI      0x16
39
#define SD_OP_DEL_VDI        0x17
40

    
41
#define SD_FLAG_CMD_WRITE    0x01
42
#define SD_FLAG_CMD_COW      0x02
43
#define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
44
#define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
45

    
46
#define SD_RES_SUCCESS       0x00 /* Success */
47
#define SD_RES_UNKNOWN       0x01 /* Unknown error */
48
#define SD_RES_NO_OBJ        0x02 /* No object found */
49
#define SD_RES_EIO           0x03 /* I/O error */
50
#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
51
#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
52
#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
53
#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
54
#define SD_RES_NO_VDI        0x08 /* No vdi found */
55
#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
56
#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
57
#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
58
#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
59
#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
60
#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
61
#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
62
#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
63
#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
64
#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
65
#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
66
#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
67
#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
68
#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
69
#define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
70
#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
71
#define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
72
#define SD_RES_READONLY      0x1A /* Object is read-only */
73

    
74
/*
75
 * Object ID rules
76
 *
77
 *  0 - 19 (20 bits): data object space
78
 * 20 - 31 (12 bits): reserved data object space
79
 * 32 - 55 (24 bits): vdi object space
80
 * 56 - 59 ( 4 bits): reserved vdi object space
81
 * 60 - 63 ( 4 bits): object type identifier space
82
 */
83

    
84
#define VDI_SPACE_SHIFT   32
85
#define VDI_BIT (UINT64_C(1) << 63)
86
#define VMSTATE_BIT (UINT64_C(1) << 62)
87
#define MAX_DATA_OBJS (UINT64_C(1) << 20)
88
#define MAX_CHILDREN 1024
89
#define SD_MAX_VDI_LEN 256
90
#define SD_MAX_VDI_TAG_LEN 256
91
#define SD_NR_VDIS   (1U << 24)
92
#define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
93
#define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
94

    
95
#define SD_INODE_SIZE (sizeof(SheepdogInode))
96
#define CURRENT_VDI_ID 0
97

    
98
typedef struct SheepdogReq {
99
    uint8_t proto_ver;
100
    uint8_t opcode;
101
    uint16_t flags;
102
    uint32_t epoch;
103
    uint32_t id;
104
    uint32_t data_length;
105
    uint32_t opcode_specific[8];
106
} SheepdogReq;
107

    
108
typedef struct SheepdogRsp {
109
    uint8_t proto_ver;
110
    uint8_t opcode;
111
    uint16_t flags;
112
    uint32_t epoch;
113
    uint32_t id;
114
    uint32_t data_length;
115
    uint32_t result;
116
    uint32_t opcode_specific[7];
117
} SheepdogRsp;
118

    
119
typedef struct SheepdogObjReq {
120
    uint8_t proto_ver;
121
    uint8_t opcode;
122
    uint16_t flags;
123
    uint32_t epoch;
124
    uint32_t id;
125
    uint32_t data_length;
126
    uint64_t oid;
127
    uint64_t cow_oid;
128
    uint32_t copies;
129
    uint32_t rsvd;
130
    uint64_t offset;
131
} SheepdogObjReq;
132

    
133
typedef struct SheepdogObjRsp {
134
    uint8_t proto_ver;
135
    uint8_t opcode;
136
    uint16_t flags;
137
    uint32_t epoch;
138
    uint32_t id;
139
    uint32_t data_length;
140
    uint32_t result;
141
    uint32_t copies;
142
    uint32_t pad[6];
143
} SheepdogObjRsp;
144

    
145
typedef struct SheepdogVdiReq {
146
    uint8_t proto_ver;
147
    uint8_t opcode;
148
    uint16_t flags;
149
    uint32_t epoch;
150
    uint32_t id;
151
    uint32_t data_length;
152
    uint64_t vdi_size;
153
    uint32_t vdi_id;
154
    uint32_t copies;
155
    uint32_t snapid;
156
    uint32_t pad[3];
157
} SheepdogVdiReq;
158

    
159
typedef struct SheepdogVdiRsp {
160
    uint8_t proto_ver;
161
    uint8_t opcode;
162
    uint16_t flags;
163
    uint32_t epoch;
164
    uint32_t id;
165
    uint32_t data_length;
166
    uint32_t result;
167
    uint32_t rsvd;
168
    uint32_t vdi_id;
169
    uint32_t pad[5];
170
} SheepdogVdiRsp;
171

    
172
typedef struct SheepdogInode {
173
    char name[SD_MAX_VDI_LEN];
174
    char tag[SD_MAX_VDI_TAG_LEN];
175
    uint64_t ctime;
176
    uint64_t snap_ctime;
177
    uint64_t vm_clock_nsec;
178
    uint64_t vdi_size;
179
    uint64_t vm_state_size;
180
    uint16_t copy_policy;
181
    uint8_t nr_copies;
182
    uint8_t block_size_shift;
183
    uint32_t snap_id;
184
    uint32_t vdi_id;
185
    uint32_t parent_vdi_id;
186
    uint32_t child_vdi_id[MAX_CHILDREN];
187
    uint32_t data_vdi_id[MAX_DATA_OBJS];
188
} SheepdogInode;
189

    
190
/*
191
 * 64 bit FNV-1a non-zero initial basis
192
 */
193
#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
194

    
195
/*
196
 * 64 bit Fowler/Noll/Vo FNV-1a hash code
197
 */
198
static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
199
{
200
    unsigned char *bp = buf;
201
    unsigned char *be = bp + len;
202
    while (bp < be) {
203
        hval ^= (uint64_t) *bp++;
204
        hval += (hval << 1) + (hval << 4) + (hval << 5) +
205
            (hval << 7) + (hval << 8) + (hval << 40);
206
    }
207
    return hval;
208
}
209

    
210
static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
211
{
212
    return inode->vdi_id == inode->data_vdi_id[idx];
213
}
214

    
215
static inline bool is_data_obj(uint64_t oid)
216
{
217
    return !(VDI_BIT & oid);
218
}
219

    
220
static inline uint64_t data_oid_to_idx(uint64_t oid)
221
{
222
    return oid & (MAX_DATA_OBJS - 1);
223
}
224

    
225
static inline uint64_t vid_to_vdi_oid(uint32_t vid)
226
{
227
    return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
228
}
229

    
230
static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
231
{
232
    return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
233
}
234

    
235
static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
236
{
237
    return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
238
}
239

    
240
static inline bool is_snapshot(struct SheepdogInode *inode)
241
{
242
    return !!inode->snap_ctime;
243
}
244

    
245
#undef DPRINTF
246
#ifdef DEBUG_SDOG
247
#define DPRINTF(fmt, args...)                                       \
248
    do {                                                            \
249
        fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args); \
250
    } while (0)
251
#else
252
#define DPRINTF(fmt, args...)
253
#endif
254

    
255
typedef struct SheepdogAIOCB SheepdogAIOCB;
256

    
257
typedef struct AIOReq {
258
    SheepdogAIOCB *aiocb;
259
    unsigned int iov_offset;
260

    
261
    uint64_t oid;
262
    uint64_t base_oid;
263
    uint64_t offset;
264
    unsigned int data_len;
265
    uint8_t flags;
266
    uint32_t id;
267

    
268
    QLIST_ENTRY(AIOReq) aio_siblings;
269
} AIOReq;
270

    
271
enum AIOCBState {
272
    AIOCB_WRITE_UDATA,
273
    AIOCB_READ_UDATA,
274
    AIOCB_FLUSH_CACHE,
275
    AIOCB_DISCARD_OBJ,
276
};
277

    
278
struct SheepdogAIOCB {
279
    BlockDriverAIOCB common;
280

    
281
    QEMUIOVector *qiov;
282

    
283
    int64_t sector_num;
284
    int nb_sectors;
285

    
286
    int ret;
287
    enum AIOCBState aiocb_type;
288

    
289
    Coroutine *coroutine;
290
    void (*aio_done_func)(SheepdogAIOCB *);
291

    
292
    bool canceled;
293
    int nr_pending;
294
};
295

    
296
typedef struct BDRVSheepdogState {
297
    SheepdogInode inode;
298

    
299
    uint32_t min_dirty_data_idx;
300
    uint32_t max_dirty_data_idx;
301

    
302
    char name[SD_MAX_VDI_LEN];
303
    bool is_snapshot;
304
    uint32_t cache_flags;
305
    bool discard_supported;
306

    
307
    char *host_spec;
308
    bool is_unix;
309
    int fd;
310

    
311
    CoMutex lock;
312
    Coroutine *co_send;
313
    Coroutine *co_recv;
314

    
315
    uint32_t aioreq_seq_num;
316
    QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
317
    QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
318
} BDRVSheepdogState;
319

    
320
static const char * sd_strerror(int err)
321
{
322
    int i;
323

    
324
    static const struct {
325
        int err;
326
        const char *desc;
327
    } errors[] = {
328
        {SD_RES_SUCCESS, "Success"},
329
        {SD_RES_UNKNOWN, "Unknown error"},
330
        {SD_RES_NO_OBJ, "No object found"},
331
        {SD_RES_EIO, "I/O error"},
332
        {SD_RES_VDI_EXIST, "VDI exists already"},
333
        {SD_RES_INVALID_PARMS, "Invalid parameters"},
334
        {SD_RES_SYSTEM_ERROR, "System error"},
335
        {SD_RES_VDI_LOCKED, "VDI is already locked"},
336
        {SD_RES_NO_VDI, "No vdi found"},
337
        {SD_RES_NO_BASE_VDI, "No base VDI found"},
338
        {SD_RES_VDI_READ, "Failed read the requested VDI"},
339
        {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
340
        {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
341
        {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
342
        {SD_RES_NO_TAG, "Failed to find the requested tag"},
343
        {SD_RES_STARTUP, "The system is still booting"},
344
        {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
345
        {SD_RES_SHUTDOWN, "The system is shutting down"},
346
        {SD_RES_NO_MEM, "Out of memory on the server"},
347
        {SD_RES_FULL_VDI, "We already have the maximum vdis"},
348
        {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
349
        {SD_RES_NO_SPACE, "Server has no space for new objects"},
350
        {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
351
        {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
352
        {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
353
        {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
354
        {SD_RES_READONLY, "Object is read-only"},
355
    };
356

    
357
    for (i = 0; i < ARRAY_SIZE(errors); ++i) {
358
        if (errors[i].err == err) {
359
            return errors[i].desc;
360
        }
361
    }
362

    
363
    return "Invalid error code";
364
}
365

    
366
/*
367
 * Sheepdog I/O handling:
368
 *
369
 * 1. In sd_co_rw_vector, we send the I/O requests to the server and
370
 *    link the requests to the inflight_list in the
371
 *    BDRVSheepdogState.  The function exits without waiting for
372
 *    receiving the response.
373
 *
374
 * 2. We receive the response in aio_read_response, the fd handler to
375
 *    the sheepdog connection.  If metadata update is needed, we send
376
 *    the write request to the vdi object in sd_write_done, the write
377
 *    completion function.  We switch back to sd_co_readv/writev after
378
 *    all the requests belonging to the AIOCB are finished.
379
 */
380

    
381
static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
382
                                    uint64_t oid, unsigned int data_len,
383
                                    uint64_t offset, uint8_t flags,
384
                                    uint64_t base_oid, unsigned int iov_offset)
385
{
386
    AIOReq *aio_req;
387

    
388
    aio_req = g_malloc(sizeof(*aio_req));
389
    aio_req->aiocb = acb;
390
    aio_req->iov_offset = iov_offset;
391
    aio_req->oid = oid;
392
    aio_req->base_oid = base_oid;
393
    aio_req->offset = offset;
394
    aio_req->data_len = data_len;
395
    aio_req->flags = flags;
396
    aio_req->id = s->aioreq_seq_num++;
397

    
398
    acb->nr_pending++;
399
    return aio_req;
400
}
401

    
402
static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
403
{
404
    SheepdogAIOCB *acb = aio_req->aiocb;
405

    
406
    QLIST_REMOVE(aio_req, aio_siblings);
407
    g_free(aio_req);
408

    
409
    acb->nr_pending--;
410
}
411

    
412
static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
413
{
414
    if (!acb->canceled) {
415
        qemu_coroutine_enter(acb->coroutine, NULL);
416
    }
417
    qemu_aio_release(acb);
418
}
419

    
420
static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
421
{
422
    SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
423

    
424
    /*
425
     * Sheepdog cannot cancel the requests which are already sent to
426
     * the servers, so we just complete the request with -EIO here.
427
     */
428
    acb->ret = -EIO;
429
    qemu_coroutine_enter(acb->coroutine, NULL);
430
    acb->canceled = true;
431
}
432

    
433
static const AIOCBInfo sd_aiocb_info = {
434
    .aiocb_size = sizeof(SheepdogAIOCB),
435
    .cancel = sd_aio_cancel,
436
};
437

    
438
static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
439
                                   int64_t sector_num, int nb_sectors)
440
{
441
    SheepdogAIOCB *acb;
442

    
443
    acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL);
444

    
445
    acb->qiov = qiov;
446

    
447
    acb->sector_num = sector_num;
448
    acb->nb_sectors = nb_sectors;
449

    
450
    acb->aio_done_func = NULL;
451
    acb->canceled = false;
452
    acb->coroutine = qemu_coroutine_self();
453
    acb->ret = 0;
454
    acb->nr_pending = 0;
455
    return acb;
456
}
457

    
458
static int connect_to_sdog(BDRVSheepdogState *s)
459
{
460
    int fd;
461
    Error *err = NULL;
462

    
463
    if (s->is_unix) {
464
        fd = unix_connect(s->host_spec, &err);
465
    } else {
466
        fd = inet_connect(s->host_spec, &err);
467

    
468
        if (err == NULL) {
469
            int ret = socket_set_nodelay(fd);
470
            if (ret < 0) {
471
                error_report("%s", strerror(errno));
472
            }
473
        }
474
    }
475

    
476
    if (err != NULL) {
477
        qerror_report_err(err);
478
        error_free(err);
479
    } else {
480
        qemu_set_nonblock(fd);
481
    }
482

    
483
    return fd;
484
}
485

    
486
static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
487
                                    unsigned int *wlen)
488
{
489
    int ret;
490

    
491
    ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
492
    if (ret < sizeof(*hdr)) {
493
        error_report("failed to send a req, %s", strerror(errno));
494
        return ret;
495
    }
496

    
497
    ret = qemu_co_send(sockfd, data, *wlen);
498
    if (ret < *wlen) {
499
        error_report("failed to send a req, %s", strerror(errno));
500
    }
501

    
502
    return ret;
503
}
504

    
505
static void restart_co_req(void *opaque)
506
{
507
    Coroutine *co = opaque;
508

    
509
    qemu_coroutine_enter(co, NULL);
510
}
511

    
512
typedef struct SheepdogReqCo {
513
    int sockfd;
514
    SheepdogReq *hdr;
515
    void *data;
516
    unsigned int *wlen;
517
    unsigned int *rlen;
518
    int ret;
519
    bool finished;
520
} SheepdogReqCo;
521

    
522
static coroutine_fn void do_co_req(void *opaque)
523
{
524
    int ret;
525
    Coroutine *co;
526
    SheepdogReqCo *srco = opaque;
527
    int sockfd = srco->sockfd;
528
    SheepdogReq *hdr = srco->hdr;
529
    void *data = srco->data;
530
    unsigned int *wlen = srco->wlen;
531
    unsigned int *rlen = srco->rlen;
532

    
533
    co = qemu_coroutine_self();
534
    qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, co);
535

    
536
    ret = send_co_req(sockfd, hdr, data, wlen);
537
    if (ret < 0) {
538
        goto out;
539
    }
540

    
541
    qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, co);
542

    
543
    ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
544
    if (ret < sizeof(*hdr)) {
545
        error_report("failed to get a rsp, %s", strerror(errno));
546
        ret = -errno;
547
        goto out;
548
    }
549

    
550
    if (*rlen > hdr->data_length) {
551
        *rlen = hdr->data_length;
552
    }
553

    
554
    if (*rlen) {
555
        ret = qemu_co_recv(sockfd, data, *rlen);
556
        if (ret < *rlen) {
557
            error_report("failed to get the data, %s", strerror(errno));
558
            ret = -errno;
559
            goto out;
560
        }
561
    }
562
    ret = 0;
563
out:
564
    /* there is at most one request for this sockfd, so it is safe to
565
     * set each handler to NULL. */
566
    qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL);
567

    
568
    srco->ret = ret;
569
    srco->finished = true;
570
}
571

    
572
static int do_req(int sockfd, SheepdogReq *hdr, void *data,
573
                  unsigned int *wlen, unsigned int *rlen)
574
{
575
    Coroutine *co;
576
    SheepdogReqCo srco = {
577
        .sockfd = sockfd,
578
        .hdr = hdr,
579
        .data = data,
580
        .wlen = wlen,
581
        .rlen = rlen,
582
        .ret = 0,
583
        .finished = false,
584
    };
585

    
586
    if (qemu_in_coroutine()) {
587
        do_co_req(&srco);
588
    } else {
589
        co = qemu_coroutine_create(do_co_req);
590
        qemu_coroutine_enter(co, &srco);
591
        while (!srco.finished) {
592
            qemu_aio_wait();
593
        }
594
    }
595

    
596
    return srco.ret;
597
}
598

    
599
static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
600
                           struct iovec *iov, int niov, bool create,
601
                           enum AIOCBState aiocb_type);
602
static int coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
603

    
604

    
605
static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
606
{
607
    AIOReq *aio_req;
608

    
609
    QLIST_FOREACH(aio_req, &s->pending_aio_head, aio_siblings) {
610
        if (aio_req->oid == oid) {
611
            return aio_req;
612
        }
613
    }
614

    
615
    return NULL;
616
}
617

    
618
/*
619
 * This function searchs pending requests to the object `oid', and
620
 * sends them.
621
 */
622
static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
623
{
624
    AIOReq *aio_req;
625
    SheepdogAIOCB *acb;
626
    int ret;
627

    
628
    while ((aio_req = find_pending_req(s, oid)) != NULL) {
629
        acb = aio_req->aiocb;
630
        /* move aio_req from pending list to inflight one */
631
        QLIST_REMOVE(aio_req, aio_siblings);
632
        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
633
        ret = add_aio_request(s, aio_req, acb->qiov->iov,
634
                              acb->qiov->niov, false, acb->aiocb_type);
635
        if (ret < 0) {
636
            error_report("add_aio_request is failed");
637
            free_aio_req(s, aio_req);
638
            if (!acb->nr_pending) {
639
                sd_finish_aiocb(acb);
640
            }
641
        }
642
    }
643
}
644

    
645
/*
646
 * Receive responses of the I/O requests.
647
 *
648
 * This function is registered as a fd handler, and called from the
649
 * main loop when s->fd is ready for reading responses.
650
 */
651
static void coroutine_fn aio_read_response(void *opaque)
652
{
653
    SheepdogObjRsp rsp;
654
    BDRVSheepdogState *s = opaque;
655
    int fd = s->fd;
656
    int ret;
657
    AIOReq *aio_req = NULL;
658
    SheepdogAIOCB *acb;
659
    uint64_t idx;
660

    
661
    if (QLIST_EMPTY(&s->inflight_aio_head)) {
662
        goto out;
663
    }
664

    
665
    /* read a header */
666
    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
667
    if (ret < 0) {
668
        error_report("failed to get the header, %s", strerror(errno));
669
        goto out;
670
    }
671

    
672
    /* find the right aio_req from the inflight aio list */
673
    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
674
        if (aio_req->id == rsp.id) {
675
            break;
676
        }
677
    }
678
    if (!aio_req) {
679
        error_report("cannot find aio_req %x", rsp.id);
680
        goto out;
681
    }
682

    
683
    acb = aio_req->aiocb;
684

    
685
    switch (acb->aiocb_type) {
686
    case AIOCB_WRITE_UDATA:
687
        /* this coroutine context is no longer suitable for co_recv
688
         * because we may send data to update vdi objects */
689
        s->co_recv = NULL;
690
        if (!is_data_obj(aio_req->oid)) {
691
            break;
692
        }
693
        idx = data_oid_to_idx(aio_req->oid);
694

    
695
        if (s->inode.data_vdi_id[idx] != s->inode.vdi_id) {
696
            /*
697
             * If the object is newly created one, we need to update
698
             * the vdi object (metadata object).  min_dirty_data_idx
699
             * and max_dirty_data_idx are changed to include updated
700
             * index between them.
701
             */
702
            if (rsp.result == SD_RES_SUCCESS) {
703
                s->inode.data_vdi_id[idx] = s->inode.vdi_id;
704
                s->max_dirty_data_idx = MAX(idx, s->max_dirty_data_idx);
705
                s->min_dirty_data_idx = MIN(idx, s->min_dirty_data_idx);
706
            }
707
            /*
708
             * Some requests may be blocked because simultaneous
709
             * create requests are not allowed, so we search the
710
             * pending requests here.
711
             */
712
            send_pending_req(s, aio_req->oid);
713
        }
714
        break;
715
    case AIOCB_READ_UDATA:
716
        ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
717
                            aio_req->iov_offset, rsp.data_length);
718
        if (ret < 0) {
719
            error_report("failed to get the data, %s", strerror(errno));
720
            goto out;
721
        }
722
        break;
723
    case AIOCB_FLUSH_CACHE:
724
        if (rsp.result == SD_RES_INVALID_PARMS) {
725
            DPRINTF("disable cache since the server doesn't support it\n");
726
            s->cache_flags = SD_FLAG_CMD_DIRECT;
727
            rsp.result = SD_RES_SUCCESS;
728
        }
729
        break;
730
    case AIOCB_DISCARD_OBJ:
731
        switch (rsp.result) {
732
        case SD_RES_INVALID_PARMS:
733
            error_report("sheep(%s) doesn't support discard command",
734
                         s->host_spec);
735
            rsp.result = SD_RES_SUCCESS;
736
            s->discard_supported = false;
737
            break;
738
        case SD_RES_SUCCESS:
739
            idx = data_oid_to_idx(aio_req->oid);
740
            s->inode.data_vdi_id[idx] = 0;
741
            break;
742
        default:
743
            break;
744
        }
745
    }
746

    
747
    switch (rsp.result) {
748
    case SD_RES_SUCCESS:
749
        break;
750
    case SD_RES_READONLY:
751
        ret = resend_aioreq(s, aio_req);
752
        if (ret == SD_RES_SUCCESS) {
753
            goto out;
754
        }
755
        /* fall through */
756
    default:
757
        acb->ret = -EIO;
758
        error_report("%s", sd_strerror(rsp.result));
759
        break;
760
    }
761

    
762
    free_aio_req(s, aio_req);
763
    if (!acb->nr_pending) {
764
        /*
765
         * We've finished all requests which belong to the AIOCB, so
766
         * we can switch back to sd_co_readv/writev now.
767
         */
768
        acb->aio_done_func(acb);
769
    }
770
out:
771
    s->co_recv = NULL;
772
}
773

    
774
static void co_read_response(void *opaque)
775
{
776
    BDRVSheepdogState *s = opaque;
777

    
778
    if (!s->co_recv) {
779
        s->co_recv = qemu_coroutine_create(aio_read_response);
780
    }
781

    
782
    qemu_coroutine_enter(s->co_recv, opaque);
783
}
784

    
785
static void co_write_request(void *opaque)
786
{
787
    BDRVSheepdogState *s = opaque;
788

    
789
    qemu_coroutine_enter(s->co_send, NULL);
790
}
791

    
792
/*
793
 * Return a socket discriptor to read/write objects.
794
 *
795
 * We cannot use this discriptor for other operations because
796
 * the block driver may be on waiting response from the server.
797
 */
798
static int get_sheep_fd(BDRVSheepdogState *s)
799
{
800
    int fd;
801

    
802
    fd = connect_to_sdog(s);
803
    if (fd < 0) {
804
        return fd;
805
    }
806

    
807
    qemu_aio_set_fd_handler(fd, co_read_response, NULL, s);
808
    return fd;
809
}
810

    
811
static int sd_parse_uri(BDRVSheepdogState *s, const char *filename,
812
                        char *vdi, uint32_t *snapid, char *tag)
813
{
814
    URI *uri;
815
    QueryParams *qp = NULL;
816
    int ret = 0;
817

    
818
    uri = uri_parse(filename);
819
    if (!uri) {
820
        return -EINVAL;
821
    }
822

    
823
    /* transport */
824
    if (!strcmp(uri->scheme, "sheepdog")) {
825
        s->is_unix = false;
826
    } else if (!strcmp(uri->scheme, "sheepdog+tcp")) {
827
        s->is_unix = false;
828
    } else if (!strcmp(uri->scheme, "sheepdog+unix")) {
829
        s->is_unix = true;
830
    } else {
831
        ret = -EINVAL;
832
        goto out;
833
    }
834

    
835
    if (uri->path == NULL || !strcmp(uri->path, "/")) {
836
        ret = -EINVAL;
837
        goto out;
838
    }
839
    pstrcpy(vdi, SD_MAX_VDI_LEN, uri->path + 1);
840

    
841
    qp = query_params_parse(uri->query);
842
    if (qp->n > 1 || (s->is_unix && !qp->n) || (!s->is_unix && qp->n)) {
843
        ret = -EINVAL;
844
        goto out;
845
    }
846

    
847
    if (s->is_unix) {
848
        /* sheepdog+unix:///vdiname?socket=path */
849
        if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
850
            ret = -EINVAL;
851
            goto out;
852
        }
853
        s->host_spec = g_strdup(qp->p[0].value);
854
    } else {
855
        /* sheepdog[+tcp]://[host:port]/vdiname */
856
        s->host_spec = g_strdup_printf("%s:%d", uri->server ?: SD_DEFAULT_ADDR,
857
                                       uri->port ?: SD_DEFAULT_PORT);
858
    }
859

    
860
    /* snapshot tag */
861
    if (uri->fragment) {
862
        *snapid = strtoul(uri->fragment, NULL, 10);
863
        if (*snapid == 0) {
864
            pstrcpy(tag, SD_MAX_VDI_TAG_LEN, uri->fragment);
865
        }
866
    } else {
867
        *snapid = CURRENT_VDI_ID; /* search current vdi */
868
    }
869

    
870
out:
871
    if (qp) {
872
        query_params_free(qp);
873
    }
874
    uri_free(uri);
875
    return ret;
876
}
877

    
878
/*
879
 * Parse a filename (old syntax)
880
 *
881
 * filename must be one of the following formats:
882
 *   1. [vdiname]
883
 *   2. [vdiname]:[snapid]
884
 *   3. [vdiname]:[tag]
885
 *   4. [hostname]:[port]:[vdiname]
886
 *   5. [hostname]:[port]:[vdiname]:[snapid]
887
 *   6. [hostname]:[port]:[vdiname]:[tag]
888
 *
889
 * You can boot from the snapshot images by specifying `snapid` or
890
 * `tag'.
891
 *
892
 * You can run VMs outside the Sheepdog cluster by specifying
893
 * `hostname' and `port' (experimental).
894
 */
895
static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
896
                         char *vdi, uint32_t *snapid, char *tag)
897
{
898
    char *p, *q, *uri;
899
    const char *host_spec, *vdi_spec;
900
    int nr_sep, ret;
901

    
902
    strstart(filename, "sheepdog:", (const char **)&filename);
903
    p = q = g_strdup(filename);
904

    
905
    /* count the number of separators */
906
    nr_sep = 0;
907
    while (*p) {
908
        if (*p == ':') {
909
            nr_sep++;
910
        }
911
        p++;
912
    }
913
    p = q;
914

    
915
    /* use the first two tokens as host_spec. */
916
    if (nr_sep >= 2) {
917
        host_spec = p;
918
        p = strchr(p, ':');
919
        p++;
920
        p = strchr(p, ':');
921
        *p++ = '\0';
922
    } else {
923
        host_spec = "";
924
    }
925

    
926
    vdi_spec = p;
927

    
928
    p = strchr(vdi_spec, ':');
929
    if (p) {
930
        *p++ = '#';
931
    }
932

    
933
    uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
934

    
935
    ret = sd_parse_uri(s, uri, vdi, snapid, tag);
936

    
937
    g_free(q);
938
    g_free(uri);
939

    
940
    return ret;
941
}
942

    
943
static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
944
                         uint32_t snapid, const char *tag, uint32_t *vid,
945
                         bool lock)
946
{
947
    int ret, fd;
948
    SheepdogVdiReq hdr;
949
    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
950
    unsigned int wlen, rlen = 0;
951
    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
952

    
953
    fd = connect_to_sdog(s);
954
    if (fd < 0) {
955
        return fd;
956
    }
957

    
958
    /* This pair of strncpy calls ensures that the buffer is zero-filled,
959
     * which is desirable since we'll soon be sending those bytes, and
960
     * don't want the send_req to read uninitialized data.
961
     */
962
    strncpy(buf, filename, SD_MAX_VDI_LEN);
963
    strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
964

    
965
    memset(&hdr, 0, sizeof(hdr));
966
    if (lock) {
967
        hdr.opcode = SD_OP_LOCK_VDI;
968
    } else {
969
        hdr.opcode = SD_OP_GET_VDI_INFO;
970
    }
971
    wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
972
    hdr.proto_ver = SD_PROTO_VER;
973
    hdr.data_length = wlen;
974
    hdr.snapid = snapid;
975
    hdr.flags = SD_FLAG_CMD_WRITE;
976

    
977
    ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
978
    if (ret) {
979
        goto out;
980
    }
981

    
982
    if (rsp->result != SD_RES_SUCCESS) {
983
        error_report("cannot get vdi info, %s, %s %d %s",
984
                     sd_strerror(rsp->result), filename, snapid, tag);
985
        if (rsp->result == SD_RES_NO_VDI) {
986
            ret = -ENOENT;
987
        } else {
988
            ret = -EIO;
989
        }
990
        goto out;
991
    }
992
    *vid = rsp->vdi_id;
993

    
994
    ret = 0;
995
out:
996
    closesocket(fd);
997
    return ret;
998
}
999

    
1000
static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1001
                           struct iovec *iov, int niov, bool create,
1002
                           enum AIOCBState aiocb_type)
1003
{
1004
    int nr_copies = s->inode.nr_copies;
1005
    SheepdogObjReq hdr;
1006
    unsigned int wlen = 0;
1007
    int ret;
1008
    uint64_t oid = aio_req->oid;
1009
    unsigned int datalen = aio_req->data_len;
1010
    uint64_t offset = aio_req->offset;
1011
    uint8_t flags = aio_req->flags;
1012
    uint64_t old_oid = aio_req->base_oid;
1013

    
1014
    if (!nr_copies) {
1015
        error_report("bug");
1016
    }
1017

    
1018
    memset(&hdr, 0, sizeof(hdr));
1019

    
1020
    switch (aiocb_type) {
1021
    case AIOCB_FLUSH_CACHE:
1022
        hdr.opcode = SD_OP_FLUSH_VDI;
1023
        break;
1024
    case AIOCB_READ_UDATA:
1025
        hdr.opcode = SD_OP_READ_OBJ;
1026
        hdr.flags = flags;
1027
        break;
1028
    case AIOCB_WRITE_UDATA:
1029
        if (create) {
1030
            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1031
        } else {
1032
            hdr.opcode = SD_OP_WRITE_OBJ;
1033
        }
1034
        wlen = datalen;
1035
        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1036
        break;
1037
    case AIOCB_DISCARD_OBJ:
1038
        hdr.opcode = SD_OP_DISCARD_OBJ;
1039
        break;
1040
    }
1041

    
1042
    if (s->cache_flags) {
1043
        hdr.flags |= s->cache_flags;
1044
    }
1045

    
1046
    hdr.oid = oid;
1047
    hdr.cow_oid = old_oid;
1048
    hdr.copies = s->inode.nr_copies;
1049

    
1050
    hdr.data_length = datalen;
1051
    hdr.offset = offset;
1052

    
1053
    hdr.id = aio_req->id;
1054

    
1055
    qemu_co_mutex_lock(&s->lock);
1056
    s->co_send = qemu_coroutine_self();
1057
    qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request, s);
1058
    socket_set_cork(s->fd, 1);
1059

    
1060
    /* send a header */
1061
    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1062
    if (ret < 0) {
1063
        qemu_co_mutex_unlock(&s->lock);
1064
        error_report("failed to send a req, %s", strerror(errno));
1065
        return -errno;
1066
    }
1067

    
1068
    if (wlen) {
1069
        ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1070
        if (ret < 0) {
1071
            qemu_co_mutex_unlock(&s->lock);
1072
            error_report("failed to send a data, %s", strerror(errno));
1073
            return -errno;
1074
        }
1075
    }
1076

    
1077
    socket_set_cork(s->fd, 0);
1078
    qemu_aio_set_fd_handler(s->fd, co_read_response, NULL, s);
1079
    qemu_co_mutex_unlock(&s->lock);
1080

    
1081
    return 0;
1082
}
1083

    
1084
static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
1085
                             unsigned int datalen, uint64_t offset,
1086
                             bool write, bool create, uint32_t cache_flags)
1087
{
1088
    SheepdogObjReq hdr;
1089
    SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1090
    unsigned int wlen, rlen;
1091
    int ret;
1092

    
1093
    memset(&hdr, 0, sizeof(hdr));
1094

    
1095
    if (write) {
1096
        wlen = datalen;
1097
        rlen = 0;
1098
        hdr.flags = SD_FLAG_CMD_WRITE;
1099
        if (create) {
1100
            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1101
        } else {
1102
            hdr.opcode = SD_OP_WRITE_OBJ;
1103
        }
1104
    } else {
1105
        wlen = 0;
1106
        rlen = datalen;
1107
        hdr.opcode = SD_OP_READ_OBJ;
1108
    }
1109

    
1110
    hdr.flags |= cache_flags;
1111

    
1112
    hdr.oid = oid;
1113
    hdr.data_length = datalen;
1114
    hdr.offset = offset;
1115
    hdr.copies = copies;
1116

    
1117
    ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1118
    if (ret) {
1119
        error_report("failed to send a request to the sheep");
1120
        return ret;
1121
    }
1122

    
1123
    switch (rsp->result) {
1124
    case SD_RES_SUCCESS:
1125
        return 0;
1126
    default:
1127
        error_report("%s", sd_strerror(rsp->result));
1128
        return -EIO;
1129
    }
1130
}
1131

    
1132
static int read_object(int fd, char *buf, uint64_t oid, int copies,
1133
                       unsigned int datalen, uint64_t offset,
1134
                       uint32_t cache_flags)
1135
{
1136
    return read_write_object(fd, buf, oid, copies, datalen, offset, false,
1137
                             false, cache_flags);
1138
}
1139

    
1140
static int write_object(int fd, char *buf, uint64_t oid, int copies,
1141
                        unsigned int datalen, uint64_t offset, bool create,
1142
                        uint32_t cache_flags)
1143
{
1144
    return read_write_object(fd, buf, oid, copies, datalen, offset, true,
1145
                             create, cache_flags);
1146
}
1147

    
1148
/* update inode with the latest state */
1149
static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1150
{
1151
    SheepdogInode *inode;
1152
    int ret = 0, fd;
1153
    uint32_t vid = 0;
1154

    
1155
    fd = connect_to_sdog(s);
1156
    if (fd < 0) {
1157
        return -EIO;
1158
    }
1159

    
1160
    inode = g_malloc(sizeof(s->inode));
1161

    
1162
    ret = find_vdi_name(s, s->name, snapid, tag, &vid, false);
1163
    if (ret) {
1164
        goto out;
1165
    }
1166

    
1167
    ret = read_object(fd, (char *)inode, vid_to_vdi_oid(vid),
1168
                      s->inode.nr_copies, sizeof(*inode), 0, s->cache_flags);
1169
    if (ret < 0) {
1170
        goto out;
1171
    }
1172

    
1173
    if (inode->vdi_id != s->inode.vdi_id) {
1174
        memcpy(&s->inode, inode, sizeof(s->inode));
1175
    }
1176

    
1177
out:
1178
    g_free(inode);
1179
    closesocket(fd);
1180

    
1181
    return ret;
1182
}
1183

    
1184
static int coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1185
{
1186
    SheepdogAIOCB *acb = aio_req->aiocb;
1187
    bool create = false;
1188
    int ret;
1189

    
1190
    ret = reload_inode(s, 0, "");
1191
    if (ret < 0) {
1192
        return ret;
1193
    }
1194

    
1195
    aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
1196
                                   data_oid_to_idx(aio_req->oid));
1197

    
1198
    /* check whether this request becomes a CoW one */
1199
    if (acb->aiocb_type == AIOCB_WRITE_UDATA) {
1200
        int idx = data_oid_to_idx(aio_req->oid);
1201
        AIOReq *areq;
1202

    
1203
        if (s->inode.data_vdi_id[idx] == 0) {
1204
            create = true;
1205
            goto out;
1206
        }
1207
        if (is_data_obj_writable(&s->inode, idx)) {
1208
            goto out;
1209
        }
1210

    
1211
        /* link to the pending list if there is another CoW request to
1212
         * the same object */
1213
        QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
1214
            if (areq != aio_req && areq->oid == aio_req->oid) {
1215
                DPRINTF("simultaneous CoW to %" PRIx64 "\n", aio_req->oid);
1216
                QLIST_REMOVE(aio_req, aio_siblings);
1217
                QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req, aio_siblings);
1218
                return SD_RES_SUCCESS;
1219
            }
1220
        }
1221

    
1222
        aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1223
        aio_req->flags |= SD_FLAG_CMD_COW;
1224
        create = true;
1225
    }
1226
out:
1227
    return add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1228
                           create, acb->aiocb_type);
1229
}
1230

    
1231
/* TODO Convert to fine grained options */
1232
static QemuOptsList runtime_opts = {
1233
    .name = "sheepdog",
1234
    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1235
    .desc = {
1236
        {
1237
            .name = "filename",
1238
            .type = QEMU_OPT_STRING,
1239
            .help = "URL to the sheepdog image",
1240
        },
1241
        { /* end of list */ }
1242
    },
1243
};
1244

    
1245
static int sd_open(BlockDriverState *bs, QDict *options, int flags,
1246
                   Error **errp)
1247
{
1248
    int ret, fd;
1249
    uint32_t vid = 0;
1250
    BDRVSheepdogState *s = bs->opaque;
1251
    char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1252
    uint32_t snapid;
1253
    char *buf = NULL;
1254
    QemuOpts *opts;
1255
    Error *local_err = NULL;
1256
    const char *filename;
1257

    
1258
    opts = qemu_opts_create_nofail(&runtime_opts);
1259
    qemu_opts_absorb_qdict(opts, options, &local_err);
1260
    if (error_is_set(&local_err)) {
1261
        qerror_report_err(local_err);
1262
        error_free(local_err);
1263
        ret = -EINVAL;
1264
        goto out;
1265
    }
1266

    
1267
    filename = qemu_opt_get(opts, "filename");
1268

    
1269
    QLIST_INIT(&s->inflight_aio_head);
1270
    QLIST_INIT(&s->pending_aio_head);
1271
    s->fd = -1;
1272

    
1273
    memset(vdi, 0, sizeof(vdi));
1274
    memset(tag, 0, sizeof(tag));
1275

    
1276
    if (strstr(filename, "://")) {
1277
        ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1278
    } else {
1279
        ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1280
    }
1281
    if (ret < 0) {
1282
        goto out;
1283
    }
1284
    s->fd = get_sheep_fd(s);
1285
    if (s->fd < 0) {
1286
        ret = s->fd;
1287
        goto out;
1288
    }
1289

    
1290
    ret = find_vdi_name(s, vdi, snapid, tag, &vid, true);
1291
    if (ret) {
1292
        goto out;
1293
    }
1294

    
1295
    /*
1296
     * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1297
     * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1298
     */
1299
    s->cache_flags = SD_FLAG_CMD_CACHE;
1300
    if (flags & BDRV_O_NOCACHE) {
1301
        s->cache_flags = SD_FLAG_CMD_DIRECT;
1302
    }
1303
    s->discard_supported = true;
1304

    
1305
    if (snapid || tag[0] != '\0') {
1306
        DPRINTF("%" PRIx32 " snapshot inode was open.\n", vid);
1307
        s->is_snapshot = true;
1308
    }
1309

    
1310
    fd = connect_to_sdog(s);
1311
    if (fd < 0) {
1312
        ret = fd;
1313
        goto out;
1314
    }
1315

    
1316
    buf = g_malloc(SD_INODE_SIZE);
1317
    ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0,
1318
                      s->cache_flags);
1319

    
1320
    closesocket(fd);
1321

    
1322
    if (ret) {
1323
        goto out;
1324
    }
1325

    
1326
    memcpy(&s->inode, buf, sizeof(s->inode));
1327
    s->min_dirty_data_idx = UINT32_MAX;
1328
    s->max_dirty_data_idx = 0;
1329

    
1330
    bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1331
    pstrcpy(s->name, sizeof(s->name), vdi);
1332
    qemu_co_mutex_init(&s->lock);
1333
    qemu_opts_del(opts);
1334
    g_free(buf);
1335
    return 0;
1336
out:
1337
    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL);
1338
    if (s->fd >= 0) {
1339
        closesocket(s->fd);
1340
    }
1341
    qemu_opts_del(opts);
1342
    g_free(buf);
1343
    return ret;
1344
}
1345

    
1346
static int do_sd_create(BDRVSheepdogState *s, char *filename, int64_t vdi_size,
1347
                        uint32_t base_vid, uint32_t *vdi_id, int snapshot)
1348
{
1349
    SheepdogVdiReq hdr;
1350
    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1351
    int fd, ret;
1352
    unsigned int wlen, rlen = 0;
1353
    char buf[SD_MAX_VDI_LEN];
1354

    
1355
    fd = connect_to_sdog(s);
1356
    if (fd < 0) {
1357
        return fd;
1358
    }
1359

    
1360
    /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1361
     * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1362
     */
1363
    memset(buf, 0, sizeof(buf));
1364
    pstrcpy(buf, sizeof(buf), filename);
1365

    
1366
    memset(&hdr, 0, sizeof(hdr));
1367
    hdr.opcode = SD_OP_NEW_VDI;
1368
    hdr.vdi_id = base_vid;
1369

    
1370
    wlen = SD_MAX_VDI_LEN;
1371

    
1372
    hdr.flags = SD_FLAG_CMD_WRITE;
1373
    hdr.snapid = snapshot;
1374

    
1375
    hdr.data_length = wlen;
1376
    hdr.vdi_size = vdi_size;
1377

    
1378
    ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1379

    
1380
    closesocket(fd);
1381

    
1382
    if (ret) {
1383
        return ret;
1384
    }
1385

    
1386
    if (rsp->result != SD_RES_SUCCESS) {
1387
        error_report("%s, %s", sd_strerror(rsp->result), filename);
1388
        return -EIO;
1389
    }
1390

    
1391
    if (vdi_id) {
1392
        *vdi_id = rsp->vdi_id;
1393
    }
1394

    
1395
    return 0;
1396
}
1397

    
1398
static int sd_prealloc(const char *filename)
1399
{
1400
    BlockDriverState *bs = NULL;
1401
    uint32_t idx, max_idx;
1402
    int64_t vdi_size;
1403
    void *buf = g_malloc0(SD_DATA_OBJ_SIZE);
1404
    Error *local_err = NULL;
1405
    int ret;
1406

    
1407
    ret = bdrv_file_open(&bs, filename, NULL, BDRV_O_RDWR, &local_err);
1408
    if (ret < 0) {
1409
        qerror_report_err(local_err);
1410
        error_free(local_err);
1411
        goto out;
1412
    }
1413

    
1414
    vdi_size = bdrv_getlength(bs);
1415
    if (vdi_size < 0) {
1416
        ret = vdi_size;
1417
        goto out;
1418
    }
1419
    max_idx = DIV_ROUND_UP(vdi_size, SD_DATA_OBJ_SIZE);
1420

    
1421
    for (idx = 0; idx < max_idx; idx++) {
1422
        /*
1423
         * The created image can be a cloned image, so we need to read
1424
         * a data from the source image.
1425
         */
1426
        ret = bdrv_pread(bs, idx * SD_DATA_OBJ_SIZE, buf, SD_DATA_OBJ_SIZE);
1427
        if (ret < 0) {
1428
            goto out;
1429
        }
1430
        ret = bdrv_pwrite(bs, idx * SD_DATA_OBJ_SIZE, buf, SD_DATA_OBJ_SIZE);
1431
        if (ret < 0) {
1432
            goto out;
1433
        }
1434
    }
1435
out:
1436
    if (bs) {
1437
        bdrv_unref(bs);
1438
    }
1439
    g_free(buf);
1440

    
1441
    return ret;
1442
}
1443

    
1444
static int sd_create(const char *filename, QEMUOptionParameter *options,
1445
                     Error **errp)
1446
{
1447
    int ret = 0;
1448
    uint32_t vid = 0, base_vid = 0;
1449
    int64_t vdi_size = 0;
1450
    char *backing_file = NULL;
1451
    BDRVSheepdogState *s;
1452
    char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1453
    uint32_t snapid;
1454
    bool prealloc = false;
1455
    Error *local_err = NULL;
1456

    
1457
    s = g_malloc0(sizeof(BDRVSheepdogState));
1458

    
1459
    memset(vdi, 0, sizeof(vdi));
1460
    memset(tag, 0, sizeof(tag));
1461
    if (strstr(filename, "://")) {
1462
        ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1463
    } else {
1464
        ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1465
    }
1466
    if (ret < 0) {
1467
        goto out;
1468
    }
1469

    
1470
    while (options && options->name) {
1471
        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
1472
            vdi_size = options->value.n;
1473
        } else if (!strcmp(options->name, BLOCK_OPT_BACKING_FILE)) {
1474
            backing_file = options->value.s;
1475
        } else if (!strcmp(options->name, BLOCK_OPT_PREALLOC)) {
1476
            if (!options->value.s || !strcmp(options->value.s, "off")) {
1477
                prealloc = false;
1478
            } else if (!strcmp(options->value.s, "full")) {
1479
                prealloc = true;
1480
            } else {
1481
                error_report("Invalid preallocation mode: '%s'",
1482
                             options->value.s);
1483
                ret = -EINVAL;
1484
                goto out;
1485
            }
1486
        }
1487
        options++;
1488
    }
1489

    
1490
    if (vdi_size > SD_MAX_VDI_SIZE) {
1491
        error_report("too big image size");
1492
        ret = -EINVAL;
1493
        goto out;
1494
    }
1495

    
1496
    if (backing_file) {
1497
        BlockDriverState *bs;
1498
        BDRVSheepdogState *s;
1499
        BlockDriver *drv;
1500

    
1501
        /* Currently, only Sheepdog backing image is supported. */
1502
        drv = bdrv_find_protocol(backing_file, true);
1503
        if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
1504
            error_report("backing_file must be a sheepdog image");
1505
            ret = -EINVAL;
1506
            goto out;
1507
        }
1508

    
1509
        ret = bdrv_file_open(&bs, backing_file, NULL, 0, &local_err);
1510
        if (ret < 0) {
1511
            qerror_report_err(local_err);
1512
            error_free(local_err);
1513
            goto out;
1514
        }
1515

    
1516
        s = bs->opaque;
1517

    
1518
        if (!is_snapshot(&s->inode)) {
1519
            error_report("cannot clone from a non snapshot vdi");
1520
            bdrv_unref(bs);
1521
            ret = -EINVAL;
1522
            goto out;
1523
        }
1524

    
1525
        base_vid = s->inode.vdi_id;
1526
        bdrv_unref(bs);
1527
    }
1528

    
1529
    ret = do_sd_create(s, vdi, vdi_size, base_vid, &vid, 0);
1530
    if (!prealloc || ret) {
1531
        goto out;
1532
    }
1533

    
1534
    ret = sd_prealloc(filename);
1535
out:
1536
    g_free(s);
1537
    return ret;
1538
}
1539

    
1540
static void sd_close(BlockDriverState *bs)
1541
{
1542
    BDRVSheepdogState *s = bs->opaque;
1543
    SheepdogVdiReq hdr;
1544
    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1545
    unsigned int wlen, rlen = 0;
1546
    int fd, ret;
1547

    
1548
    DPRINTF("%s\n", s->name);
1549

    
1550
    fd = connect_to_sdog(s);
1551
    if (fd < 0) {
1552
        return;
1553
    }
1554

    
1555
    memset(&hdr, 0, sizeof(hdr));
1556

    
1557
    hdr.opcode = SD_OP_RELEASE_VDI;
1558
    hdr.vdi_id = s->inode.vdi_id;
1559
    wlen = strlen(s->name) + 1;
1560
    hdr.data_length = wlen;
1561
    hdr.flags = SD_FLAG_CMD_WRITE;
1562

    
1563
    ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen);
1564

    
1565
    closesocket(fd);
1566

    
1567
    if (!ret && rsp->result != SD_RES_SUCCESS &&
1568
        rsp->result != SD_RES_VDI_NOT_LOCKED) {
1569
        error_report("%s, %s", sd_strerror(rsp->result), s->name);
1570
    }
1571

    
1572
    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL);
1573
    closesocket(s->fd);
1574
    g_free(s->host_spec);
1575
}
1576

    
1577
static int64_t sd_getlength(BlockDriverState *bs)
1578
{
1579
    BDRVSheepdogState *s = bs->opaque;
1580

    
1581
    return s->inode.vdi_size;
1582
}
1583

    
1584
static int sd_truncate(BlockDriverState *bs, int64_t offset)
1585
{
1586
    BDRVSheepdogState *s = bs->opaque;
1587
    int ret, fd;
1588
    unsigned int datalen;
1589

    
1590
    if (offset < s->inode.vdi_size) {
1591
        error_report("shrinking is not supported");
1592
        return -EINVAL;
1593
    } else if (offset > SD_MAX_VDI_SIZE) {
1594
        error_report("too big image size");
1595
        return -EINVAL;
1596
    }
1597

    
1598
    fd = connect_to_sdog(s);
1599
    if (fd < 0) {
1600
        return fd;
1601
    }
1602

    
1603
    /* we don't need to update entire object */
1604
    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1605
    s->inode.vdi_size = offset;
1606
    ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
1607
                       s->inode.nr_copies, datalen, 0, false, s->cache_flags);
1608
    close(fd);
1609

    
1610
    if (ret < 0) {
1611
        error_report("failed to update an inode.");
1612
    }
1613

    
1614
    return ret;
1615
}
1616

    
1617
/*
1618
 * This function is called after writing data objects.  If we need to
1619
 * update metadata, this sends a write request to the vdi object.
1620
 * Otherwise, this switches back to sd_co_readv/writev.
1621
 */
1622
static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
1623
{
1624
    int ret;
1625
    BDRVSheepdogState *s = acb->common.bs->opaque;
1626
    struct iovec iov;
1627
    AIOReq *aio_req;
1628
    uint32_t offset, data_len, mn, mx;
1629

    
1630
    mn = s->min_dirty_data_idx;
1631
    mx = s->max_dirty_data_idx;
1632
    if (mn <= mx) {
1633
        /* we need to update the vdi object. */
1634
        offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
1635
            mn * sizeof(s->inode.data_vdi_id[0]);
1636
        data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
1637

    
1638
        s->min_dirty_data_idx = UINT32_MAX;
1639
        s->max_dirty_data_idx = 0;
1640

    
1641
        iov.iov_base = &s->inode;
1642
        iov.iov_len = sizeof(s->inode);
1643
        aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
1644
                                data_len, offset, 0, 0, offset);
1645
        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1646
        ret = add_aio_request(s, aio_req, &iov, 1, false, AIOCB_WRITE_UDATA);
1647
        if (ret) {
1648
            free_aio_req(s, aio_req);
1649
            acb->ret = -EIO;
1650
            goto out;
1651
        }
1652

    
1653
        acb->aio_done_func = sd_finish_aiocb;
1654
        acb->aiocb_type = AIOCB_WRITE_UDATA;
1655
        return;
1656
    }
1657
out:
1658
    sd_finish_aiocb(acb);
1659
}
1660

    
1661
/* Delete current working VDI on the snapshot chain */
1662
static bool sd_delete(BDRVSheepdogState *s)
1663
{
1664
    unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
1665
    SheepdogVdiReq hdr = {
1666
        .opcode = SD_OP_DEL_VDI,
1667
        .vdi_id = s->inode.vdi_id,
1668
        .data_length = wlen,
1669
        .flags = SD_FLAG_CMD_WRITE,
1670
    };
1671
    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1672
    int fd, ret;
1673

    
1674
    fd = connect_to_sdog(s);
1675
    if (fd < 0) {
1676
        return false;
1677
    }
1678

    
1679
    ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen);
1680
    closesocket(fd);
1681
    if (ret) {
1682
        return false;
1683
    }
1684
    switch (rsp->result) {
1685
    case SD_RES_NO_VDI:
1686
        error_report("%s was already deleted", s->name);
1687
        /* fall through */
1688
    case SD_RES_SUCCESS:
1689
        break;
1690
    default:
1691
        error_report("%s, %s", sd_strerror(rsp->result), s->name);
1692
        return false;
1693
    }
1694

    
1695
    return true;
1696
}
1697

    
1698
/*
1699
 * Create a writable VDI from a snapshot
1700
 */
1701
static int sd_create_branch(BDRVSheepdogState *s)
1702
{
1703
    int ret, fd;
1704
    uint32_t vid;
1705
    char *buf;
1706
    bool deleted;
1707

    
1708
    DPRINTF("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
1709

    
1710
    buf = g_malloc(SD_INODE_SIZE);
1711

    
1712
    /*
1713
     * Even If deletion fails, we will just create extra snapshot based on
1714
     * the workding VDI which was supposed to be deleted. So no need to
1715
     * false bail out.
1716
     */
1717
    deleted = sd_delete(s);
1718
    ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &vid,
1719
                       !deleted);
1720
    if (ret) {
1721
        goto out;
1722
    }
1723

    
1724
    DPRINTF("%" PRIx32 " is created.\n", vid);
1725

    
1726
    fd = connect_to_sdog(s);
1727
    if (fd < 0) {
1728
        ret = fd;
1729
        goto out;
1730
    }
1731

    
1732
    ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
1733
                      SD_INODE_SIZE, 0, s->cache_flags);
1734

    
1735
    closesocket(fd);
1736

    
1737
    if (ret < 0) {
1738
        goto out;
1739
    }
1740

    
1741
    memcpy(&s->inode, buf, sizeof(s->inode));
1742

    
1743
    s->is_snapshot = false;
1744
    ret = 0;
1745
    DPRINTF("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
1746

    
1747
out:
1748
    g_free(buf);
1749

    
1750
    return ret;
1751
}
1752

    
1753
/*
1754
 * Send I/O requests to the server.
1755
 *
1756
 * This function sends requests to the server, links the requests to
1757
 * the inflight_list in BDRVSheepdogState, and exits without
1758
 * waiting the response.  The responses are received in the
1759
 * `aio_read_response' function which is called from the main loop as
1760
 * a fd handler.
1761
 *
1762
 * Returns 1 when we need to wait a response, 0 when there is no sent
1763
 * request and -errno in error cases.
1764
 */
1765
static int coroutine_fn sd_co_rw_vector(void *p)
1766
{
1767
    SheepdogAIOCB *acb = p;
1768
    int ret = 0;
1769
    unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
1770
    unsigned long idx = acb->sector_num * BDRV_SECTOR_SIZE / SD_DATA_OBJ_SIZE;
1771
    uint64_t oid;
1772
    uint64_t offset = (acb->sector_num * BDRV_SECTOR_SIZE) % SD_DATA_OBJ_SIZE;
1773
    BDRVSheepdogState *s = acb->common.bs->opaque;
1774
    SheepdogInode *inode = &s->inode;
1775
    AIOReq *aio_req;
1776

    
1777
    if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
1778
        /*
1779
         * In the case we open the snapshot VDI, Sheepdog creates the
1780
         * writable VDI when we do a write operation first.
1781
         */
1782
        ret = sd_create_branch(s);
1783
        if (ret) {
1784
            acb->ret = -EIO;
1785
            goto out;
1786
        }
1787
    }
1788

    
1789
    /*
1790
     * Make sure we don't free the aiocb before we are done with all requests.
1791
     * This additional reference is dropped at the end of this function.
1792
     */
1793
    acb->nr_pending++;
1794

    
1795
    while (done != total) {
1796
        uint8_t flags = 0;
1797
        uint64_t old_oid = 0;
1798
        bool create = false;
1799

    
1800
        oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
1801

    
1802
        len = MIN(total - done, SD_DATA_OBJ_SIZE - offset);
1803

    
1804
        switch (acb->aiocb_type) {
1805
        case AIOCB_READ_UDATA:
1806
            if (!inode->data_vdi_id[idx]) {
1807
                qemu_iovec_memset(acb->qiov, done, 0, len);
1808
                goto done;
1809
            }
1810
            break;
1811
        case AIOCB_WRITE_UDATA:
1812
            if (!inode->data_vdi_id[idx]) {
1813
                create = true;
1814
            } else if (!is_data_obj_writable(inode, idx)) {
1815
                /* Copy-On-Write */
1816
                create = true;
1817
                old_oid = oid;
1818
                flags = SD_FLAG_CMD_COW;
1819
            }
1820
            break;
1821
        case AIOCB_DISCARD_OBJ:
1822
            /*
1823
             * We discard the object only when the whole object is
1824
             * 1) allocated 2) trimmed. Otherwise, simply skip it.
1825
             */
1826
            if (len != SD_DATA_OBJ_SIZE || inode->data_vdi_id[idx] == 0) {
1827
                goto done;
1828
            }
1829
            break;
1830
        default:
1831
            break;
1832
        }
1833

    
1834
        if (create) {
1835
            DPRINTF("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
1836
                    inode->vdi_id, oid,
1837
                    vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
1838
            oid = vid_to_data_oid(inode->vdi_id, idx);
1839
            DPRINTF("new oid %" PRIx64 "\n", oid);
1840
        }
1841

    
1842
        aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, done);
1843

    
1844
        if (create) {
1845
            AIOReq *areq;
1846
            QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
1847
                if (areq->oid == oid) {
1848
                    /*
1849
                     * Sheepdog cannot handle simultaneous create
1850
                     * requests to the same object.  So we cannot send
1851
                     * the request until the previous request
1852
                     * finishes.
1853
                     */
1854
                    aio_req->flags = 0;
1855
                    aio_req->base_oid = 0;
1856
                    QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req,
1857
                                      aio_siblings);
1858
                    goto done;
1859
                }
1860
            }
1861
        }
1862

    
1863
        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1864
        ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1865
                              create, acb->aiocb_type);
1866
        if (ret < 0) {
1867
            error_report("add_aio_request is failed");
1868
            free_aio_req(s, aio_req);
1869
            acb->ret = -EIO;
1870
            goto out;
1871
        }
1872
    done:
1873
        offset = 0;
1874
        idx++;
1875
        done += len;
1876
    }
1877
out:
1878
    if (!--acb->nr_pending) {
1879
        return acb->ret;
1880
    }
1881
    return 1;
1882
}
1883

    
1884
static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
1885
                        int nb_sectors, QEMUIOVector *qiov)
1886
{
1887
    SheepdogAIOCB *acb;
1888
    int ret;
1889

    
1890
    if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
1891
        ret = sd_truncate(bs, (sector_num + nb_sectors) * BDRV_SECTOR_SIZE);
1892
        if (ret < 0) {
1893
            return ret;
1894
        }
1895
        bs->total_sectors = sector_num + nb_sectors;
1896
    }
1897

    
1898
    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
1899
    acb->aio_done_func = sd_write_done;
1900
    acb->aiocb_type = AIOCB_WRITE_UDATA;
1901

    
1902
    ret = sd_co_rw_vector(acb);
1903
    if (ret <= 0) {
1904
        qemu_aio_release(acb);
1905
        return ret;
1906
    }
1907

    
1908
    qemu_coroutine_yield();
1909

    
1910
    return acb->ret;
1911
}
1912

    
1913
static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
1914
                       int nb_sectors, QEMUIOVector *qiov)
1915
{
1916
    SheepdogAIOCB *acb;
1917
    int ret;
1918

    
1919
    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
1920
    acb->aiocb_type = AIOCB_READ_UDATA;
1921
    acb->aio_done_func = sd_finish_aiocb;
1922

    
1923
    ret = sd_co_rw_vector(acb);
1924
    if (ret <= 0) {
1925
        qemu_aio_release(acb);
1926
        return ret;
1927
    }
1928

    
1929
    qemu_coroutine_yield();
1930

    
1931
    return acb->ret;
1932
}
1933

    
1934
static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
1935
{
1936
    BDRVSheepdogState *s = bs->opaque;
1937
    SheepdogAIOCB *acb;
1938
    AIOReq *aio_req;
1939
    int ret;
1940

    
1941
    if (s->cache_flags != SD_FLAG_CMD_CACHE) {
1942
        return 0;
1943
    }
1944

    
1945
    acb = sd_aio_setup(bs, NULL, 0, 0);
1946
    acb->aiocb_type = AIOCB_FLUSH_CACHE;
1947
    acb->aio_done_func = sd_finish_aiocb;
1948

    
1949
    aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
1950
                            0, 0, 0, 0, 0);
1951
    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1952
    ret = add_aio_request(s, aio_req, NULL, 0, false, acb->aiocb_type);
1953
    if (ret < 0) {
1954
        error_report("add_aio_request is failed");
1955
        free_aio_req(s, aio_req);
1956
        qemu_aio_release(acb);
1957
        return ret;
1958
    }
1959

    
1960
    qemu_coroutine_yield();
1961
    return acb->ret;
1962
}
1963

    
1964
static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
1965
{
1966
    BDRVSheepdogState *s = bs->opaque;
1967
    int ret, fd;
1968
    uint32_t new_vid;
1969
    SheepdogInode *inode;
1970
    unsigned int datalen;
1971

    
1972
    DPRINTF("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
1973
            "is_snapshot %d\n", sn_info->name, sn_info->id_str,
1974
            s->name, sn_info->vm_state_size, s->is_snapshot);
1975

    
1976
    if (s->is_snapshot) {
1977
        error_report("You can't create a snapshot of a snapshot VDI, "
1978
                     "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
1979

    
1980
        return -EINVAL;
1981
    }
1982

    
1983
    DPRINTF("%s %s\n", sn_info->name, sn_info->id_str);
1984

    
1985
    s->inode.vm_state_size = sn_info->vm_state_size;
1986
    s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
1987
    /* It appears that inode.tag does not require a NUL terminator,
1988
     * which means this use of strncpy is ok.
1989
     */
1990
    strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
1991
    /* we don't need to update entire object */
1992
    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1993

    
1994
    /* refresh inode. */
1995
    fd = connect_to_sdog(s);
1996
    if (fd < 0) {
1997
        ret = fd;
1998
        goto cleanup;
1999
    }
2000

    
2001
    ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
2002
                       s->inode.nr_copies, datalen, 0, false, s->cache_flags);
2003
    if (ret < 0) {
2004
        error_report("failed to write snapshot's inode.");
2005
        goto cleanup;
2006
    }
2007

    
2008
    ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &new_vid,
2009
                       1);
2010
    if (ret < 0) {
2011
        error_report("failed to create inode for snapshot. %s",
2012
                     strerror(errno));
2013
        goto cleanup;
2014
    }
2015

    
2016
    inode = (SheepdogInode *)g_malloc(datalen);
2017

    
2018
    ret = read_object(fd, (char *)inode, vid_to_vdi_oid(new_vid),
2019
                      s->inode.nr_copies, datalen, 0, s->cache_flags);
2020

    
2021
    if (ret < 0) {
2022
        error_report("failed to read new inode info. %s", strerror(errno));
2023
        goto cleanup;
2024
    }
2025

    
2026
    memcpy(&s->inode, inode, datalen);
2027
    DPRINTF("s->inode: name %s snap_id %x oid %x\n",
2028
            s->inode.name, s->inode.snap_id, s->inode.vdi_id);
2029

    
2030
cleanup:
2031
    closesocket(fd);
2032
    return ret;
2033
}
2034

    
2035
/*
2036
 * We implement rollback(loadvm) operation to the specified snapshot by
2037
 * 1) switch to the snapshot
2038
 * 2) rely on sd_create_branch to delete working VDI and
2039
 * 3) create a new working VDI based on the speicified snapshot
2040
 */
2041
static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2042
{
2043
    BDRVSheepdogState *s = bs->opaque;
2044
    BDRVSheepdogState *old_s;
2045
    char tag[SD_MAX_VDI_TAG_LEN];
2046
    uint32_t snapid = 0;
2047
    int ret = 0;
2048

    
2049
    old_s = g_malloc(sizeof(BDRVSheepdogState));
2050

    
2051
    memcpy(old_s, s, sizeof(BDRVSheepdogState));
2052

    
2053
    snapid = strtoul(snapshot_id, NULL, 10);
2054
    if (snapid) {
2055
        tag[0] = 0;
2056
    } else {
2057
        pstrcpy(tag, sizeof(tag), snapshot_id);
2058
    }
2059

    
2060
    ret = reload_inode(s, snapid, tag);
2061
    if (ret) {
2062
        goto out;
2063
    }
2064

    
2065
    ret = sd_create_branch(s);
2066
    if (ret) {
2067
        goto out;
2068
    }
2069

    
2070
    g_free(old_s);
2071

    
2072
    return 0;
2073
out:
2074
    /* recover bdrv_sd_state */
2075
    memcpy(s, old_s, sizeof(BDRVSheepdogState));
2076
    g_free(old_s);
2077

    
2078
    error_report("failed to open. recover old bdrv_sd_state.");
2079

    
2080
    return ret;
2081
}
2082

    
2083
static int sd_snapshot_delete(BlockDriverState *bs,
2084
                              const char *snapshot_id,
2085
                              const char *name,
2086
                              Error **errp)
2087
{
2088
    /* FIXME: Delete specified snapshot id.  */
2089
    return 0;
2090
}
2091

    
2092
static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2093
{
2094
    BDRVSheepdogState *s = bs->opaque;
2095
    SheepdogReq req;
2096
    int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2097
    QEMUSnapshotInfo *sn_tab = NULL;
2098
    unsigned wlen, rlen;
2099
    int found = 0;
2100
    static SheepdogInode inode;
2101
    unsigned long *vdi_inuse;
2102
    unsigned int start_nr;
2103
    uint64_t hval;
2104
    uint32_t vid;
2105

    
2106
    vdi_inuse = g_malloc(max);
2107

    
2108
    fd = connect_to_sdog(s);
2109
    if (fd < 0) {
2110
        ret = fd;
2111
        goto out;
2112
    }
2113

    
2114
    rlen = max;
2115
    wlen = 0;
2116

    
2117
    memset(&req, 0, sizeof(req));
2118

    
2119
    req.opcode = SD_OP_READ_VDIS;
2120
    req.data_length = max;
2121

    
2122
    ret = do_req(fd, (SheepdogReq *)&req, vdi_inuse, &wlen, &rlen);
2123

    
2124
    closesocket(fd);
2125
    if (ret) {
2126
        goto out;
2127
    }
2128

    
2129
    sn_tab = g_malloc0(nr * sizeof(*sn_tab));
2130

    
2131
    /* calculate a vdi id with hash function */
2132
    hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2133
    start_nr = hval & (SD_NR_VDIS - 1);
2134

    
2135
    fd = connect_to_sdog(s);
2136
    if (fd < 0) {
2137
        ret = fd;
2138
        goto out;
2139
    }
2140

    
2141
    for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2142
        if (!test_bit(vid, vdi_inuse)) {
2143
            break;
2144
        }
2145

    
2146
        /* we don't need to read entire object */
2147
        ret = read_object(fd, (char *)&inode, vid_to_vdi_oid(vid),
2148
                          0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
2149
                          s->cache_flags);
2150

    
2151
        if (ret) {
2152
            continue;
2153
        }
2154

    
2155
        if (!strcmp(inode.name, s->name) && is_snapshot(&inode)) {
2156
            sn_tab[found].date_sec = inode.snap_ctime >> 32;
2157
            sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
2158
            sn_tab[found].vm_state_size = inode.vm_state_size;
2159
            sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
2160

    
2161
            snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str), "%u",
2162
                     inode.snap_id);
2163
            pstrcpy(sn_tab[found].name,
2164
                    MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)),
2165
                    inode.tag);
2166
            found++;
2167
        }
2168
    }
2169

    
2170
    closesocket(fd);
2171
out:
2172
    *psn_tab = sn_tab;
2173

    
2174
    g_free(vdi_inuse);
2175

    
2176
    if (ret < 0) {
2177
        return ret;
2178
    }
2179

    
2180
    return found;
2181
}
2182

    
2183
static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
2184
                                int64_t pos, int size, int load)
2185
{
2186
    bool create;
2187
    int fd, ret = 0, remaining = size;
2188
    unsigned int data_len;
2189
    uint64_t vmstate_oid;
2190
    uint64_t offset;
2191
    uint32_t vdi_index;
2192
    uint32_t vdi_id = load ? s->inode.parent_vdi_id : s->inode.vdi_id;
2193

    
2194
    fd = connect_to_sdog(s);
2195
    if (fd < 0) {
2196
        return fd;
2197
    }
2198

    
2199
    while (remaining) {
2200
        vdi_index = pos / SD_DATA_OBJ_SIZE;
2201
        offset = pos % SD_DATA_OBJ_SIZE;
2202

    
2203
        data_len = MIN(remaining, SD_DATA_OBJ_SIZE - offset);
2204

    
2205
        vmstate_oid = vid_to_vmstate_oid(vdi_id, vdi_index);
2206

    
2207
        create = (offset == 0);
2208
        if (load) {
2209
            ret = read_object(fd, (char *)data, vmstate_oid,
2210
                              s->inode.nr_copies, data_len, offset,
2211
                              s->cache_flags);
2212
        } else {
2213
            ret = write_object(fd, (char *)data, vmstate_oid,
2214
                               s->inode.nr_copies, data_len, offset, create,
2215
                               s->cache_flags);
2216
        }
2217

    
2218
        if (ret < 0) {
2219
            error_report("failed to save vmstate %s", strerror(errno));
2220
            goto cleanup;
2221
        }
2222

    
2223
        pos += data_len;
2224
        data += data_len;
2225
        remaining -= data_len;
2226
    }
2227
    ret = size;
2228
cleanup:
2229
    closesocket(fd);
2230
    return ret;
2231
}
2232

    
2233
static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
2234
                           int64_t pos)
2235
{
2236
    BDRVSheepdogState *s = bs->opaque;
2237
    void *buf;
2238
    int ret;
2239

    
2240
    buf = qemu_blockalign(bs, qiov->size);
2241
    qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
2242
    ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
2243
    qemu_vfree(buf);
2244

    
2245
    return ret;
2246
}
2247

    
2248
static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
2249
                           int64_t pos, int size)
2250
{
2251
    BDRVSheepdogState *s = bs->opaque;
2252

    
2253
    return do_load_save_vmstate(s, data, pos, size, 1);
2254
}
2255

    
2256

    
2257
static coroutine_fn int sd_co_discard(BlockDriverState *bs, int64_t sector_num,
2258
                                      int nb_sectors)
2259
{
2260
    SheepdogAIOCB *acb;
2261
    QEMUIOVector dummy;
2262
    BDRVSheepdogState *s = bs->opaque;
2263
    int ret;
2264

    
2265
    if (!s->discard_supported) {
2266
            return 0;
2267
    }
2268

    
2269
    acb = sd_aio_setup(bs, &dummy, sector_num, nb_sectors);
2270
    acb->aiocb_type = AIOCB_DISCARD_OBJ;
2271
    acb->aio_done_func = sd_finish_aiocb;
2272

    
2273
    ret = sd_co_rw_vector(acb);
2274
    if (ret <= 0) {
2275
        qemu_aio_release(acb);
2276
        return ret;
2277
    }
2278

    
2279
    qemu_coroutine_yield();
2280

    
2281
    return acb->ret;
2282
}
2283

    
2284
static coroutine_fn int64_t
2285
sd_co_get_block_status(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
2286
                       int *pnum)
2287
{
2288
    BDRVSheepdogState *s = bs->opaque;
2289
    SheepdogInode *inode = &s->inode;
2290
    unsigned long start = sector_num * BDRV_SECTOR_SIZE / SD_DATA_OBJ_SIZE,
2291
                  end = DIV_ROUND_UP((sector_num + nb_sectors) *
2292
                                     BDRV_SECTOR_SIZE, SD_DATA_OBJ_SIZE);
2293
    unsigned long idx;
2294
    int64_t ret = BDRV_BLOCK_DATA;
2295

    
2296
    for (idx = start; idx < end; idx++) {
2297
        if (inode->data_vdi_id[idx] == 0) {
2298
            break;
2299
        }
2300
    }
2301
    if (idx == start) {
2302
        /* Get the longest length of unallocated sectors */
2303
        ret = 0;
2304
        for (idx = start + 1; idx < end; idx++) {
2305
            if (inode->data_vdi_id[idx] != 0) {
2306
                break;
2307
            }
2308
        }
2309
    }
2310

    
2311
    *pnum = (idx - start) * SD_DATA_OBJ_SIZE / BDRV_SECTOR_SIZE;
2312
    if (*pnum > nb_sectors) {
2313
        *pnum = nb_sectors;
2314
    }
2315
    return ret;
2316
}
2317

    
2318
static QEMUOptionParameter sd_create_options[] = {
2319
    {
2320
        .name = BLOCK_OPT_SIZE,
2321
        .type = OPT_SIZE,
2322
        .help = "Virtual disk size"
2323
    },
2324
    {
2325
        .name = BLOCK_OPT_BACKING_FILE,
2326
        .type = OPT_STRING,
2327
        .help = "File name of a base image"
2328
    },
2329
    {
2330
        .name = BLOCK_OPT_PREALLOC,
2331
        .type = OPT_STRING,
2332
        .help = "Preallocation mode (allowed values: off, full)"
2333
    },
2334
    { NULL }
2335
};
2336

    
2337
static BlockDriver bdrv_sheepdog = {
2338
    .format_name    = "sheepdog",
2339
    .protocol_name  = "sheepdog",
2340
    .instance_size  = sizeof(BDRVSheepdogState),
2341
    .bdrv_file_open = sd_open,
2342
    .bdrv_close     = sd_close,
2343
    .bdrv_create    = sd_create,
2344
    .bdrv_has_zero_init = bdrv_has_zero_init_1,
2345
    .bdrv_getlength = sd_getlength,
2346
    .bdrv_truncate  = sd_truncate,
2347

    
2348
    .bdrv_co_readv  = sd_co_readv,
2349
    .bdrv_co_writev = sd_co_writev,
2350
    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2351
    .bdrv_co_discard = sd_co_discard,
2352
    .bdrv_co_get_block_status = sd_co_get_block_status,
2353

    
2354
    .bdrv_snapshot_create   = sd_snapshot_create,
2355
    .bdrv_snapshot_goto     = sd_snapshot_goto,
2356
    .bdrv_snapshot_delete   = sd_snapshot_delete,
2357
    .bdrv_snapshot_list     = sd_snapshot_list,
2358

    
2359
    .bdrv_save_vmstate  = sd_save_vmstate,
2360
    .bdrv_load_vmstate  = sd_load_vmstate,
2361

    
2362
    .create_options = sd_create_options,
2363
};
2364

    
2365
static BlockDriver bdrv_sheepdog_tcp = {
2366
    .format_name    = "sheepdog",
2367
    .protocol_name  = "sheepdog+tcp",
2368
    .instance_size  = sizeof(BDRVSheepdogState),
2369
    .bdrv_file_open = sd_open,
2370
    .bdrv_close     = sd_close,
2371
    .bdrv_create    = sd_create,
2372
    .bdrv_has_zero_init = bdrv_has_zero_init_1,
2373
    .bdrv_getlength = sd_getlength,
2374
    .bdrv_truncate  = sd_truncate,
2375

    
2376
    .bdrv_co_readv  = sd_co_readv,
2377
    .bdrv_co_writev = sd_co_writev,
2378
    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2379
    .bdrv_co_discard = sd_co_discard,
2380
    .bdrv_co_get_block_status = sd_co_get_block_status,
2381

    
2382
    .bdrv_snapshot_create   = sd_snapshot_create,
2383
    .bdrv_snapshot_goto     = sd_snapshot_goto,
2384
    .bdrv_snapshot_delete   = sd_snapshot_delete,
2385
    .bdrv_snapshot_list     = sd_snapshot_list,
2386

    
2387
    .bdrv_save_vmstate  = sd_save_vmstate,
2388
    .bdrv_load_vmstate  = sd_load_vmstate,
2389

    
2390
    .create_options = sd_create_options,
2391
};
2392

    
2393
static BlockDriver bdrv_sheepdog_unix = {
2394
    .format_name    = "sheepdog",
2395
    .protocol_name  = "sheepdog+unix",
2396
    .instance_size  = sizeof(BDRVSheepdogState),
2397
    .bdrv_file_open = sd_open,
2398
    .bdrv_close     = sd_close,
2399
    .bdrv_create    = sd_create,
2400
    .bdrv_has_zero_init = bdrv_has_zero_init_1,
2401
    .bdrv_getlength = sd_getlength,
2402
    .bdrv_truncate  = sd_truncate,
2403

    
2404
    .bdrv_co_readv  = sd_co_readv,
2405
    .bdrv_co_writev = sd_co_writev,
2406
    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2407
    .bdrv_co_discard = sd_co_discard,
2408
    .bdrv_co_get_block_status = sd_co_get_block_status,
2409

    
2410
    .bdrv_snapshot_create   = sd_snapshot_create,
2411
    .bdrv_snapshot_goto     = sd_snapshot_goto,
2412
    .bdrv_snapshot_delete   = sd_snapshot_delete,
2413
    .bdrv_snapshot_list     = sd_snapshot_list,
2414

    
2415
    .bdrv_save_vmstate  = sd_save_vmstate,
2416
    .bdrv_load_vmstate  = sd_load_vmstate,
2417

    
2418
    .create_options = sd_create_options,
2419
};
2420

    
2421
static void bdrv_sheepdog_init(void)
2422
{
2423
    bdrv_register(&bdrv_sheepdog);
2424
    bdrv_register(&bdrv_sheepdog_tcp);
2425
    bdrv_register(&bdrv_sheepdog_unix);
2426
}
2427
block_init(bdrv_sheepdog_init);