Statistics
| Branch: | Revision:

root / block / sheepdog.c @ e304e8e5

History | View | Annotate | Download (64.1 kB)

1
/*
2
 * Copyright (C) 2009-2010 Nippon Telegraph and Telephone Corporation.
3
 *
4
 * This program is free software; you can redistribute it and/or
5
 * modify it under the terms of the GNU General Public License version
6
 * 2 as published by the Free Software Foundation.
7
 *
8
 * You should have received a copy of the GNU General Public License
9
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
10
 *
11
 * Contributions after 2012-01-13 are licensed under the terms of the
12
 * GNU GPL, version 2 or (at your option) any later version.
13
 */
14

    
15
#include "qemu-common.h"
16
#include "qemu/uri.h"
17
#include "qemu/error-report.h"
18
#include "qemu/sockets.h"
19
#include "block/block_int.h"
20
#include "qemu/bitops.h"
21

    
22
#define SD_PROTO_VER 0x01
23

    
24
#define SD_DEFAULT_ADDR "localhost"
25
#define SD_DEFAULT_PORT 7000
26

    
27
#define SD_OP_CREATE_AND_WRITE_OBJ  0x01
28
#define SD_OP_READ_OBJ       0x02
29
#define SD_OP_WRITE_OBJ      0x03
30
/* 0x04 is used internally by Sheepdog */
31
#define SD_OP_DISCARD_OBJ    0x05
32

    
33
#define SD_OP_NEW_VDI        0x11
34
#define SD_OP_LOCK_VDI       0x12
35
#define SD_OP_RELEASE_VDI    0x13
36
#define SD_OP_GET_VDI_INFO   0x14
37
#define SD_OP_READ_VDIS      0x15
38
#define SD_OP_FLUSH_VDI      0x16
39
#define SD_OP_DEL_VDI        0x17
40

    
41
#define SD_FLAG_CMD_WRITE    0x01
42
#define SD_FLAG_CMD_COW      0x02
43
#define SD_FLAG_CMD_CACHE    0x04 /* Writeback mode for cache */
44
#define SD_FLAG_CMD_DIRECT   0x08 /* Don't use cache */
45

    
46
#define SD_RES_SUCCESS       0x00 /* Success */
47
#define SD_RES_UNKNOWN       0x01 /* Unknown error */
48
#define SD_RES_NO_OBJ        0x02 /* No object found */
49
#define SD_RES_EIO           0x03 /* I/O error */
50
#define SD_RES_VDI_EXIST     0x04 /* Vdi exists already */
51
#define SD_RES_INVALID_PARMS 0x05 /* Invalid parameters */
52
#define SD_RES_SYSTEM_ERROR  0x06 /* System error */
53
#define SD_RES_VDI_LOCKED    0x07 /* Vdi is locked */
54
#define SD_RES_NO_VDI        0x08 /* No vdi found */
55
#define SD_RES_NO_BASE_VDI   0x09 /* No base vdi found */
56
#define SD_RES_VDI_READ      0x0A /* Cannot read requested vdi */
57
#define SD_RES_VDI_WRITE     0x0B /* Cannot write requested vdi */
58
#define SD_RES_BASE_VDI_READ 0x0C /* Cannot read base vdi */
59
#define SD_RES_BASE_VDI_WRITE   0x0D /* Cannot write base vdi */
60
#define SD_RES_NO_TAG        0x0E /* Requested tag is not found */
61
#define SD_RES_STARTUP       0x0F /* Sheepdog is on starting up */
62
#define SD_RES_VDI_NOT_LOCKED   0x10 /* Vdi is not locked */
63
#define SD_RES_SHUTDOWN      0x11 /* Sheepdog is shutting down */
64
#define SD_RES_NO_MEM        0x12 /* Cannot allocate memory */
65
#define SD_RES_FULL_VDI      0x13 /* we already have the maximum vdis */
66
#define SD_RES_VER_MISMATCH  0x14 /* Protocol version mismatch */
67
#define SD_RES_NO_SPACE      0x15 /* Server has no room for new objects */
68
#define SD_RES_WAIT_FOR_FORMAT  0x16 /* Waiting for a format operation */
69
#define SD_RES_WAIT_FOR_JOIN    0x17 /* Waiting for other nodes joining */
70
#define SD_RES_JOIN_FAILED   0x18 /* Target node had failed to join sheepdog */
71
#define SD_RES_HALT          0x19 /* Sheepdog is stopped serving IO request */
72
#define SD_RES_READONLY      0x1A /* Object is read-only */
73

    
74
/*
75
 * Object ID rules
76
 *
77
 *  0 - 19 (20 bits): data object space
78
 * 20 - 31 (12 bits): reserved data object space
79
 * 32 - 55 (24 bits): vdi object space
80
 * 56 - 59 ( 4 bits): reserved vdi object space
81
 * 60 - 63 ( 4 bits): object type identifier space
82
 */
83

    
84
#define VDI_SPACE_SHIFT   32
85
#define VDI_BIT (UINT64_C(1) << 63)
86
#define VMSTATE_BIT (UINT64_C(1) << 62)
87
#define MAX_DATA_OBJS (UINT64_C(1) << 20)
88
#define MAX_CHILDREN 1024
89
#define SD_MAX_VDI_LEN 256
90
#define SD_MAX_VDI_TAG_LEN 256
91
#define SD_NR_VDIS   (1U << 24)
92
#define SD_DATA_OBJ_SIZE (UINT64_C(1) << 22)
93
#define SD_MAX_VDI_SIZE (SD_DATA_OBJ_SIZE * MAX_DATA_OBJS)
94

    
95
#define SD_INODE_SIZE (sizeof(SheepdogInode))
96
#define CURRENT_VDI_ID 0
97

    
98
typedef struct SheepdogReq {
99
    uint8_t proto_ver;
100
    uint8_t opcode;
101
    uint16_t flags;
102
    uint32_t epoch;
103
    uint32_t id;
104
    uint32_t data_length;
105
    uint32_t opcode_specific[8];
106
} SheepdogReq;
107

    
108
typedef struct SheepdogRsp {
109
    uint8_t proto_ver;
110
    uint8_t opcode;
111
    uint16_t flags;
112
    uint32_t epoch;
113
    uint32_t id;
114
    uint32_t data_length;
115
    uint32_t result;
116
    uint32_t opcode_specific[7];
117
} SheepdogRsp;
118

    
119
typedef struct SheepdogObjReq {
120
    uint8_t proto_ver;
121
    uint8_t opcode;
122
    uint16_t flags;
123
    uint32_t epoch;
124
    uint32_t id;
125
    uint32_t data_length;
126
    uint64_t oid;
127
    uint64_t cow_oid;
128
    uint32_t copies;
129
    uint32_t rsvd;
130
    uint64_t offset;
131
} SheepdogObjReq;
132

    
133
typedef struct SheepdogObjRsp {
134
    uint8_t proto_ver;
135
    uint8_t opcode;
136
    uint16_t flags;
137
    uint32_t epoch;
138
    uint32_t id;
139
    uint32_t data_length;
140
    uint32_t result;
141
    uint32_t copies;
142
    uint32_t pad[6];
143
} SheepdogObjRsp;
144

    
145
typedef struct SheepdogVdiReq {
146
    uint8_t proto_ver;
147
    uint8_t opcode;
148
    uint16_t flags;
149
    uint32_t epoch;
150
    uint32_t id;
151
    uint32_t data_length;
152
    uint64_t vdi_size;
153
    uint32_t vdi_id;
154
    uint32_t copies;
155
    uint32_t snapid;
156
    uint32_t pad[3];
157
} SheepdogVdiReq;
158

    
159
typedef struct SheepdogVdiRsp {
160
    uint8_t proto_ver;
161
    uint8_t opcode;
162
    uint16_t flags;
163
    uint32_t epoch;
164
    uint32_t id;
165
    uint32_t data_length;
166
    uint32_t result;
167
    uint32_t rsvd;
168
    uint32_t vdi_id;
169
    uint32_t pad[5];
170
} SheepdogVdiRsp;
171

    
172
typedef struct SheepdogInode {
173
    char name[SD_MAX_VDI_LEN];
174
    char tag[SD_MAX_VDI_TAG_LEN];
175
    uint64_t ctime;
176
    uint64_t snap_ctime;
177
    uint64_t vm_clock_nsec;
178
    uint64_t vdi_size;
179
    uint64_t vm_state_size;
180
    uint16_t copy_policy;
181
    uint8_t nr_copies;
182
    uint8_t block_size_shift;
183
    uint32_t snap_id;
184
    uint32_t vdi_id;
185
    uint32_t parent_vdi_id;
186
    uint32_t child_vdi_id[MAX_CHILDREN];
187
    uint32_t data_vdi_id[MAX_DATA_OBJS];
188
} SheepdogInode;
189

    
190
/*
191
 * 64 bit FNV-1a non-zero initial basis
192
 */
193
#define FNV1A_64_INIT ((uint64_t)0xcbf29ce484222325ULL)
194

    
195
/*
196
 * 64 bit Fowler/Noll/Vo FNV-1a hash code
197
 */
198
static inline uint64_t fnv_64a_buf(void *buf, size_t len, uint64_t hval)
199
{
200
    unsigned char *bp = buf;
201
    unsigned char *be = bp + len;
202
    while (bp < be) {
203
        hval ^= (uint64_t) *bp++;
204
        hval += (hval << 1) + (hval << 4) + (hval << 5) +
205
            (hval << 7) + (hval << 8) + (hval << 40);
206
    }
207
    return hval;
208
}
209

    
210
static inline bool is_data_obj_writable(SheepdogInode *inode, unsigned int idx)
211
{
212
    return inode->vdi_id == inode->data_vdi_id[idx];
213
}
214

    
215
static inline bool is_data_obj(uint64_t oid)
216
{
217
    return !(VDI_BIT & oid);
218
}
219

    
220
static inline uint64_t data_oid_to_idx(uint64_t oid)
221
{
222
    return oid & (MAX_DATA_OBJS - 1);
223
}
224

    
225
static inline uint64_t vid_to_vdi_oid(uint32_t vid)
226
{
227
    return VDI_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT);
228
}
229

    
230
static inline uint64_t vid_to_vmstate_oid(uint32_t vid, uint32_t idx)
231
{
232
    return VMSTATE_BIT | ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
233
}
234

    
235
static inline uint64_t vid_to_data_oid(uint32_t vid, uint32_t idx)
236
{
237
    return ((uint64_t)vid << VDI_SPACE_SHIFT) | idx;
238
}
239

    
240
static inline bool is_snapshot(struct SheepdogInode *inode)
241
{
242
    return !!inode->snap_ctime;
243
}
244

    
245
#undef dprintf
246
#ifdef DEBUG_SDOG
247
#define dprintf(fmt, args...)                                       \
248
    do {                                                            \
249
        fprintf(stdout, "%s %d: " fmt, __func__, __LINE__, ##args); \
250
    } while (0)
251
#else
252
#define dprintf(fmt, args...)
253
#endif
254

    
255
typedef struct SheepdogAIOCB SheepdogAIOCB;
256

    
257
typedef struct AIOReq {
258
    SheepdogAIOCB *aiocb;
259
    unsigned int iov_offset;
260

    
261
    uint64_t oid;
262
    uint64_t base_oid;
263
    uint64_t offset;
264
    unsigned int data_len;
265
    uint8_t flags;
266
    uint32_t id;
267

    
268
    QLIST_ENTRY(AIOReq) aio_siblings;
269
} AIOReq;
270

    
271
enum AIOCBState {
272
    AIOCB_WRITE_UDATA,
273
    AIOCB_READ_UDATA,
274
    AIOCB_FLUSH_CACHE,
275
    AIOCB_DISCARD_OBJ,
276
};
277

    
278
struct SheepdogAIOCB {
279
    BlockDriverAIOCB common;
280

    
281
    QEMUIOVector *qiov;
282

    
283
    int64_t sector_num;
284
    int nb_sectors;
285

    
286
    int ret;
287
    enum AIOCBState aiocb_type;
288

    
289
    Coroutine *coroutine;
290
    void (*aio_done_func)(SheepdogAIOCB *);
291

    
292
    bool canceled;
293
    int nr_pending;
294
};
295

    
296
typedef struct BDRVSheepdogState {
297
    SheepdogInode inode;
298

    
299
    uint32_t min_dirty_data_idx;
300
    uint32_t max_dirty_data_idx;
301

    
302
    char name[SD_MAX_VDI_LEN];
303
    bool is_snapshot;
304
    uint32_t cache_flags;
305
    bool discard_supported;
306

    
307
    char *host_spec;
308
    bool is_unix;
309
    int fd;
310

    
311
    CoMutex lock;
312
    Coroutine *co_send;
313
    Coroutine *co_recv;
314

    
315
    uint32_t aioreq_seq_num;
316
    QLIST_HEAD(inflight_aio_head, AIOReq) inflight_aio_head;
317
    QLIST_HEAD(pending_aio_head, AIOReq) pending_aio_head;
318
} BDRVSheepdogState;
319

    
320
static const char * sd_strerror(int err)
321
{
322
    int i;
323

    
324
    static const struct {
325
        int err;
326
        const char *desc;
327
    } errors[] = {
328
        {SD_RES_SUCCESS, "Success"},
329
        {SD_RES_UNKNOWN, "Unknown error"},
330
        {SD_RES_NO_OBJ, "No object found"},
331
        {SD_RES_EIO, "I/O error"},
332
        {SD_RES_VDI_EXIST, "VDI exists already"},
333
        {SD_RES_INVALID_PARMS, "Invalid parameters"},
334
        {SD_RES_SYSTEM_ERROR, "System error"},
335
        {SD_RES_VDI_LOCKED, "VDI is already locked"},
336
        {SD_RES_NO_VDI, "No vdi found"},
337
        {SD_RES_NO_BASE_VDI, "No base VDI found"},
338
        {SD_RES_VDI_READ, "Failed read the requested VDI"},
339
        {SD_RES_VDI_WRITE, "Failed to write the requested VDI"},
340
        {SD_RES_BASE_VDI_READ, "Failed to read the base VDI"},
341
        {SD_RES_BASE_VDI_WRITE, "Failed to write the base VDI"},
342
        {SD_RES_NO_TAG, "Failed to find the requested tag"},
343
        {SD_RES_STARTUP, "The system is still booting"},
344
        {SD_RES_VDI_NOT_LOCKED, "VDI isn't locked"},
345
        {SD_RES_SHUTDOWN, "The system is shutting down"},
346
        {SD_RES_NO_MEM, "Out of memory on the server"},
347
        {SD_RES_FULL_VDI, "We already have the maximum vdis"},
348
        {SD_RES_VER_MISMATCH, "Protocol version mismatch"},
349
        {SD_RES_NO_SPACE, "Server has no space for new objects"},
350
        {SD_RES_WAIT_FOR_FORMAT, "Sheepdog is waiting for a format operation"},
351
        {SD_RES_WAIT_FOR_JOIN, "Sheepdog is waiting for other nodes joining"},
352
        {SD_RES_JOIN_FAILED, "Target node had failed to join sheepdog"},
353
        {SD_RES_HALT, "Sheepdog is stopped serving IO request"},
354
        {SD_RES_READONLY, "Object is read-only"},
355
    };
356

    
357
    for (i = 0; i < ARRAY_SIZE(errors); ++i) {
358
        if (errors[i].err == err) {
359
            return errors[i].desc;
360
        }
361
    }
362

    
363
    return "Invalid error code";
364
}
365

    
366
/*
367
 * Sheepdog I/O handling:
368
 *
369
 * 1. In sd_co_rw_vector, we send the I/O requests to the server and
370
 *    link the requests to the inflight_list in the
371
 *    BDRVSheepdogState.  The function exits without waiting for
372
 *    receiving the response.
373
 *
374
 * 2. We receive the response in aio_read_response, the fd handler to
375
 *    the sheepdog connection.  If metadata update is needed, we send
376
 *    the write request to the vdi object in sd_write_done, the write
377
 *    completion function.  We switch back to sd_co_readv/writev after
378
 *    all the requests belonging to the AIOCB are finished.
379
 */
380

    
381
static inline AIOReq *alloc_aio_req(BDRVSheepdogState *s, SheepdogAIOCB *acb,
382
                                    uint64_t oid, unsigned int data_len,
383
                                    uint64_t offset, uint8_t flags,
384
                                    uint64_t base_oid, unsigned int iov_offset)
385
{
386
    AIOReq *aio_req;
387

    
388
    aio_req = g_malloc(sizeof(*aio_req));
389
    aio_req->aiocb = acb;
390
    aio_req->iov_offset = iov_offset;
391
    aio_req->oid = oid;
392
    aio_req->base_oid = base_oid;
393
    aio_req->offset = offset;
394
    aio_req->data_len = data_len;
395
    aio_req->flags = flags;
396
    aio_req->id = s->aioreq_seq_num++;
397

    
398
    acb->nr_pending++;
399
    return aio_req;
400
}
401

    
402
static inline void free_aio_req(BDRVSheepdogState *s, AIOReq *aio_req)
403
{
404
    SheepdogAIOCB *acb = aio_req->aiocb;
405

    
406
    QLIST_REMOVE(aio_req, aio_siblings);
407
    g_free(aio_req);
408

    
409
    acb->nr_pending--;
410
}
411

    
412
static void coroutine_fn sd_finish_aiocb(SheepdogAIOCB *acb)
413
{
414
    if (!acb->canceled) {
415
        qemu_coroutine_enter(acb->coroutine, NULL);
416
    }
417
    qemu_aio_release(acb);
418
}
419

    
420
static void sd_aio_cancel(BlockDriverAIOCB *blockacb)
421
{
422
    SheepdogAIOCB *acb = (SheepdogAIOCB *)blockacb;
423

    
424
    /*
425
     * Sheepdog cannot cancel the requests which are already sent to
426
     * the servers, so we just complete the request with -EIO here.
427
     */
428
    acb->ret = -EIO;
429
    qemu_coroutine_enter(acb->coroutine, NULL);
430
    acb->canceled = true;
431
}
432

    
433
static const AIOCBInfo sd_aiocb_info = {
434
    .aiocb_size = sizeof(SheepdogAIOCB),
435
    .cancel = sd_aio_cancel,
436
};
437

    
438
static SheepdogAIOCB *sd_aio_setup(BlockDriverState *bs, QEMUIOVector *qiov,
439
                                   int64_t sector_num, int nb_sectors)
440
{
441
    SheepdogAIOCB *acb;
442

    
443
    acb = qemu_aio_get(&sd_aiocb_info, bs, NULL, NULL);
444

    
445
    acb->qiov = qiov;
446

    
447
    acb->sector_num = sector_num;
448
    acb->nb_sectors = nb_sectors;
449

    
450
    acb->aio_done_func = NULL;
451
    acb->canceled = false;
452
    acb->coroutine = qemu_coroutine_self();
453
    acb->ret = 0;
454
    acb->nr_pending = 0;
455
    return acb;
456
}
457

    
458
static int connect_to_sdog(BDRVSheepdogState *s)
459
{
460
    int fd;
461
    Error *err = NULL;
462

    
463
    if (s->is_unix) {
464
        fd = unix_connect(s->host_spec, &err);
465
    } else {
466
        fd = inet_connect(s->host_spec, &err);
467

    
468
        if (err == NULL) {
469
            int ret = socket_set_nodelay(fd);
470
            if (ret < 0) {
471
                error_report("%s", strerror(errno));
472
            }
473
        }
474
    }
475

    
476
    if (err != NULL) {
477
        qerror_report_err(err);
478
        error_free(err);
479
    } else {
480
        qemu_set_nonblock(fd);
481
    }
482

    
483
    return fd;
484
}
485

    
486
static coroutine_fn int send_co_req(int sockfd, SheepdogReq *hdr, void *data,
487
                                    unsigned int *wlen)
488
{
489
    int ret;
490

    
491
    ret = qemu_co_send(sockfd, hdr, sizeof(*hdr));
492
    if (ret < sizeof(*hdr)) {
493
        error_report("failed to send a req, %s", strerror(errno));
494
        return ret;
495
    }
496

    
497
    ret = qemu_co_send(sockfd, data, *wlen);
498
    if (ret < *wlen) {
499
        error_report("failed to send a req, %s", strerror(errno));
500
    }
501

    
502
    return ret;
503
}
504

    
505
static void restart_co_req(void *opaque)
506
{
507
    Coroutine *co = opaque;
508

    
509
    qemu_coroutine_enter(co, NULL);
510
}
511

    
512
static int have_co_req(void *opaque)
513
{
514
    /* this handler is set only when there is a pending request, so
515
     * always returns 1. */
516
    return 1;
517
}
518

    
519
typedef struct SheepdogReqCo {
520
    int sockfd;
521
    SheepdogReq *hdr;
522
    void *data;
523
    unsigned int *wlen;
524
    unsigned int *rlen;
525
    int ret;
526
    bool finished;
527
} SheepdogReqCo;
528

    
529
static coroutine_fn void do_co_req(void *opaque)
530
{
531
    int ret;
532
    Coroutine *co;
533
    SheepdogReqCo *srco = opaque;
534
    int sockfd = srco->sockfd;
535
    SheepdogReq *hdr = srco->hdr;
536
    void *data = srco->data;
537
    unsigned int *wlen = srco->wlen;
538
    unsigned int *rlen = srco->rlen;
539

    
540
    co = qemu_coroutine_self();
541
    qemu_aio_set_fd_handler(sockfd, NULL, restart_co_req, have_co_req, co);
542

    
543
    ret = send_co_req(sockfd, hdr, data, wlen);
544
    if (ret < 0) {
545
        goto out;
546
    }
547

    
548
    qemu_aio_set_fd_handler(sockfd, restart_co_req, NULL, have_co_req, co);
549

    
550
    ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
551
    if (ret < sizeof(*hdr)) {
552
        error_report("failed to get a rsp, %s", strerror(errno));
553
        ret = -errno;
554
        goto out;
555
    }
556

    
557
    if (*rlen > hdr->data_length) {
558
        *rlen = hdr->data_length;
559
    }
560

    
561
    if (*rlen) {
562
        ret = qemu_co_recv(sockfd, data, *rlen);
563
        if (ret < *rlen) {
564
            error_report("failed to get the data, %s", strerror(errno));
565
            ret = -errno;
566
            goto out;
567
        }
568
    }
569
    ret = 0;
570
out:
571
    /* there is at most one request for this sockfd, so it is safe to
572
     * set each handler to NULL. */
573
    qemu_aio_set_fd_handler(sockfd, NULL, NULL, NULL, NULL);
574

    
575
    srco->ret = ret;
576
    srco->finished = true;
577
}
578

    
579
static int do_req(int sockfd, SheepdogReq *hdr, void *data,
580
                  unsigned int *wlen, unsigned int *rlen)
581
{
582
    Coroutine *co;
583
    SheepdogReqCo srco = {
584
        .sockfd = sockfd,
585
        .hdr = hdr,
586
        .data = data,
587
        .wlen = wlen,
588
        .rlen = rlen,
589
        .ret = 0,
590
        .finished = false,
591
    };
592

    
593
    if (qemu_in_coroutine()) {
594
        do_co_req(&srco);
595
    } else {
596
        co = qemu_coroutine_create(do_co_req);
597
        qemu_coroutine_enter(co, &srco);
598
        while (!srco.finished) {
599
            qemu_aio_wait();
600
        }
601
    }
602

    
603
    return srco.ret;
604
}
605

    
606
static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
607
                           struct iovec *iov, int niov, bool create,
608
                           enum AIOCBState aiocb_type);
609
static int coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req);
610

    
611

    
612
static AIOReq *find_pending_req(BDRVSheepdogState *s, uint64_t oid)
613
{
614
    AIOReq *aio_req;
615

    
616
    QLIST_FOREACH(aio_req, &s->pending_aio_head, aio_siblings) {
617
        if (aio_req->oid == oid) {
618
            return aio_req;
619
        }
620
    }
621

    
622
    return NULL;
623
}
624

    
625
/*
626
 * This function searchs pending requests to the object `oid', and
627
 * sends them.
628
 */
629
static void coroutine_fn send_pending_req(BDRVSheepdogState *s, uint64_t oid)
630
{
631
    AIOReq *aio_req;
632
    SheepdogAIOCB *acb;
633
    int ret;
634

    
635
    while ((aio_req = find_pending_req(s, oid)) != NULL) {
636
        acb = aio_req->aiocb;
637
        /* move aio_req from pending list to inflight one */
638
        QLIST_REMOVE(aio_req, aio_siblings);
639
        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
640
        ret = add_aio_request(s, aio_req, acb->qiov->iov,
641
                              acb->qiov->niov, false, acb->aiocb_type);
642
        if (ret < 0) {
643
            error_report("add_aio_request is failed");
644
            free_aio_req(s, aio_req);
645
            if (!acb->nr_pending) {
646
                sd_finish_aiocb(acb);
647
            }
648
        }
649
    }
650
}
651

    
652
/*
653
 * Receive responses of the I/O requests.
654
 *
655
 * This function is registered as a fd handler, and called from the
656
 * main loop when s->fd is ready for reading responses.
657
 */
658
static void coroutine_fn aio_read_response(void *opaque)
659
{
660
    SheepdogObjRsp rsp;
661
    BDRVSheepdogState *s = opaque;
662
    int fd = s->fd;
663
    int ret;
664
    AIOReq *aio_req = NULL;
665
    SheepdogAIOCB *acb;
666
    uint64_t idx;
667

    
668
    if (QLIST_EMPTY(&s->inflight_aio_head)) {
669
        goto out;
670
    }
671

    
672
    /* read a header */
673
    ret = qemu_co_recv(fd, &rsp, sizeof(rsp));
674
    if (ret < 0) {
675
        error_report("failed to get the header, %s", strerror(errno));
676
        goto out;
677
    }
678

    
679
    /* find the right aio_req from the inflight aio list */
680
    QLIST_FOREACH(aio_req, &s->inflight_aio_head, aio_siblings) {
681
        if (aio_req->id == rsp.id) {
682
            break;
683
        }
684
    }
685
    if (!aio_req) {
686
        error_report("cannot find aio_req %x", rsp.id);
687
        goto out;
688
    }
689

    
690
    acb = aio_req->aiocb;
691

    
692
    switch (acb->aiocb_type) {
693
    case AIOCB_WRITE_UDATA:
694
        /* this coroutine context is no longer suitable for co_recv
695
         * because we may send data to update vdi objects */
696
        s->co_recv = NULL;
697
        if (!is_data_obj(aio_req->oid)) {
698
            break;
699
        }
700
        idx = data_oid_to_idx(aio_req->oid);
701

    
702
        if (s->inode.data_vdi_id[idx] != s->inode.vdi_id) {
703
            /*
704
             * If the object is newly created one, we need to update
705
             * the vdi object (metadata object).  min_dirty_data_idx
706
             * and max_dirty_data_idx are changed to include updated
707
             * index between them.
708
             */
709
            if (rsp.result == SD_RES_SUCCESS) {
710
                s->inode.data_vdi_id[idx] = s->inode.vdi_id;
711
                s->max_dirty_data_idx = MAX(idx, s->max_dirty_data_idx);
712
                s->min_dirty_data_idx = MIN(idx, s->min_dirty_data_idx);
713
            }
714
            /*
715
             * Some requests may be blocked because simultaneous
716
             * create requests are not allowed, so we search the
717
             * pending requests here.
718
             */
719
            send_pending_req(s, aio_req->oid);
720
        }
721
        break;
722
    case AIOCB_READ_UDATA:
723
        ret = qemu_co_recvv(fd, acb->qiov->iov, acb->qiov->niov,
724
                            aio_req->iov_offset, rsp.data_length);
725
        if (ret < 0) {
726
            error_report("failed to get the data, %s", strerror(errno));
727
            goto out;
728
        }
729
        break;
730
    case AIOCB_FLUSH_CACHE:
731
        if (rsp.result == SD_RES_INVALID_PARMS) {
732
            dprintf("disable cache since the server doesn't support it\n");
733
            s->cache_flags = SD_FLAG_CMD_DIRECT;
734
            rsp.result = SD_RES_SUCCESS;
735
        }
736
        break;
737
    case AIOCB_DISCARD_OBJ:
738
        switch (rsp.result) {
739
        case SD_RES_INVALID_PARMS:
740
            error_report("sheep(%s) doesn't support discard command",
741
                         s->host_spec);
742
            rsp.result = SD_RES_SUCCESS;
743
            s->discard_supported = false;
744
            break;
745
        case SD_RES_SUCCESS:
746
            idx = data_oid_to_idx(aio_req->oid);
747
            s->inode.data_vdi_id[idx] = 0;
748
            break;
749
        default:
750
            break;
751
        }
752
    }
753

    
754
    switch (rsp.result) {
755
    case SD_RES_SUCCESS:
756
        break;
757
    case SD_RES_READONLY:
758
        ret = resend_aioreq(s, aio_req);
759
        if (ret == SD_RES_SUCCESS) {
760
            goto out;
761
        }
762
        /* fall through */
763
    default:
764
        acb->ret = -EIO;
765
        error_report("%s", sd_strerror(rsp.result));
766
        break;
767
    }
768

    
769
    free_aio_req(s, aio_req);
770
    if (!acb->nr_pending) {
771
        /*
772
         * We've finished all requests which belong to the AIOCB, so
773
         * we can switch back to sd_co_readv/writev now.
774
         */
775
        acb->aio_done_func(acb);
776
    }
777
out:
778
    s->co_recv = NULL;
779
}
780

    
781
static void co_read_response(void *opaque)
782
{
783
    BDRVSheepdogState *s = opaque;
784

    
785
    if (!s->co_recv) {
786
        s->co_recv = qemu_coroutine_create(aio_read_response);
787
    }
788

    
789
    qemu_coroutine_enter(s->co_recv, opaque);
790
}
791

    
792
static void co_write_request(void *opaque)
793
{
794
    BDRVSheepdogState *s = opaque;
795

    
796
    qemu_coroutine_enter(s->co_send, NULL);
797
}
798

    
799
static int aio_flush_request(void *opaque)
800
{
801
    BDRVSheepdogState *s = opaque;
802

    
803
    return !QLIST_EMPTY(&s->inflight_aio_head) ||
804
        !QLIST_EMPTY(&s->pending_aio_head);
805
}
806

    
807
/*
808
 * Return a socket discriptor to read/write objects.
809
 *
810
 * We cannot use this discriptor for other operations because
811
 * the block driver may be on waiting response from the server.
812
 */
813
static int get_sheep_fd(BDRVSheepdogState *s)
814
{
815
    int fd;
816

    
817
    fd = connect_to_sdog(s);
818
    if (fd < 0) {
819
        return fd;
820
    }
821

    
822
    qemu_aio_set_fd_handler(fd, co_read_response, NULL, aio_flush_request, s);
823
    return fd;
824
}
825

    
826
static int sd_parse_uri(BDRVSheepdogState *s, const char *filename,
827
                        char *vdi, uint32_t *snapid, char *tag)
828
{
829
    URI *uri;
830
    QueryParams *qp = NULL;
831
    int ret = 0;
832

    
833
    uri = uri_parse(filename);
834
    if (!uri) {
835
        return -EINVAL;
836
    }
837

    
838
    /* transport */
839
    if (!strcmp(uri->scheme, "sheepdog")) {
840
        s->is_unix = false;
841
    } else if (!strcmp(uri->scheme, "sheepdog+tcp")) {
842
        s->is_unix = false;
843
    } else if (!strcmp(uri->scheme, "sheepdog+unix")) {
844
        s->is_unix = true;
845
    } else {
846
        ret = -EINVAL;
847
        goto out;
848
    }
849

    
850
    if (uri->path == NULL || !strcmp(uri->path, "/")) {
851
        ret = -EINVAL;
852
        goto out;
853
    }
854
    pstrcpy(vdi, SD_MAX_VDI_LEN, uri->path + 1);
855

    
856
    qp = query_params_parse(uri->query);
857
    if (qp->n > 1 || (s->is_unix && !qp->n) || (!s->is_unix && qp->n)) {
858
        ret = -EINVAL;
859
        goto out;
860
    }
861

    
862
    if (s->is_unix) {
863
        /* sheepdog+unix:///vdiname?socket=path */
864
        if (uri->server || uri->port || strcmp(qp->p[0].name, "socket")) {
865
            ret = -EINVAL;
866
            goto out;
867
        }
868
        s->host_spec = g_strdup(qp->p[0].value);
869
    } else {
870
        /* sheepdog[+tcp]://[host:port]/vdiname */
871
        s->host_spec = g_strdup_printf("%s:%d", uri->server ?: SD_DEFAULT_ADDR,
872
                                       uri->port ?: SD_DEFAULT_PORT);
873
    }
874

    
875
    /* snapshot tag */
876
    if (uri->fragment) {
877
        *snapid = strtoul(uri->fragment, NULL, 10);
878
        if (*snapid == 0) {
879
            pstrcpy(tag, SD_MAX_VDI_TAG_LEN, uri->fragment);
880
        }
881
    } else {
882
        *snapid = CURRENT_VDI_ID; /* search current vdi */
883
    }
884

    
885
out:
886
    if (qp) {
887
        query_params_free(qp);
888
    }
889
    uri_free(uri);
890
    return ret;
891
}
892

    
893
/*
894
 * Parse a filename (old syntax)
895
 *
896
 * filename must be one of the following formats:
897
 *   1. [vdiname]
898
 *   2. [vdiname]:[snapid]
899
 *   3. [vdiname]:[tag]
900
 *   4. [hostname]:[port]:[vdiname]
901
 *   5. [hostname]:[port]:[vdiname]:[snapid]
902
 *   6. [hostname]:[port]:[vdiname]:[tag]
903
 *
904
 * You can boot from the snapshot images by specifying `snapid` or
905
 * `tag'.
906
 *
907
 * You can run VMs outside the Sheepdog cluster by specifying
908
 * `hostname' and `port' (experimental).
909
 */
910
static int parse_vdiname(BDRVSheepdogState *s, const char *filename,
911
                         char *vdi, uint32_t *snapid, char *tag)
912
{
913
    char *p, *q, *uri;
914
    const char *host_spec, *vdi_spec;
915
    int nr_sep, ret;
916

    
917
    strstart(filename, "sheepdog:", (const char **)&filename);
918
    p = q = g_strdup(filename);
919

    
920
    /* count the number of separators */
921
    nr_sep = 0;
922
    while (*p) {
923
        if (*p == ':') {
924
            nr_sep++;
925
        }
926
        p++;
927
    }
928
    p = q;
929

    
930
    /* use the first two tokens as host_spec. */
931
    if (nr_sep >= 2) {
932
        host_spec = p;
933
        p = strchr(p, ':');
934
        p++;
935
        p = strchr(p, ':');
936
        *p++ = '\0';
937
    } else {
938
        host_spec = "";
939
    }
940

    
941
    vdi_spec = p;
942

    
943
    p = strchr(vdi_spec, ':');
944
    if (p) {
945
        *p++ = '#';
946
    }
947

    
948
    uri = g_strdup_printf("sheepdog://%s/%s", host_spec, vdi_spec);
949

    
950
    ret = sd_parse_uri(s, uri, vdi, snapid, tag);
951

    
952
    g_free(q);
953
    g_free(uri);
954

    
955
    return ret;
956
}
957

    
958
static int find_vdi_name(BDRVSheepdogState *s, const char *filename,
959
                         uint32_t snapid, const char *tag, uint32_t *vid,
960
                         bool lock)
961
{
962
    int ret, fd;
963
    SheepdogVdiReq hdr;
964
    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
965
    unsigned int wlen, rlen = 0;
966
    char buf[SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN];
967

    
968
    fd = connect_to_sdog(s);
969
    if (fd < 0) {
970
        return fd;
971
    }
972

    
973
    /* This pair of strncpy calls ensures that the buffer is zero-filled,
974
     * which is desirable since we'll soon be sending those bytes, and
975
     * don't want the send_req to read uninitialized data.
976
     */
977
    strncpy(buf, filename, SD_MAX_VDI_LEN);
978
    strncpy(buf + SD_MAX_VDI_LEN, tag, SD_MAX_VDI_TAG_LEN);
979

    
980
    memset(&hdr, 0, sizeof(hdr));
981
    if (lock) {
982
        hdr.opcode = SD_OP_LOCK_VDI;
983
    } else {
984
        hdr.opcode = SD_OP_GET_VDI_INFO;
985
    }
986
    wlen = SD_MAX_VDI_LEN + SD_MAX_VDI_TAG_LEN;
987
    hdr.proto_ver = SD_PROTO_VER;
988
    hdr.data_length = wlen;
989
    hdr.snapid = snapid;
990
    hdr.flags = SD_FLAG_CMD_WRITE;
991

    
992
    ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
993
    if (ret) {
994
        goto out;
995
    }
996

    
997
    if (rsp->result != SD_RES_SUCCESS) {
998
        error_report("cannot get vdi info, %s, %s %d %s",
999
                     sd_strerror(rsp->result), filename, snapid, tag);
1000
        if (rsp->result == SD_RES_NO_VDI) {
1001
            ret = -ENOENT;
1002
        } else {
1003
            ret = -EIO;
1004
        }
1005
        goto out;
1006
    }
1007
    *vid = rsp->vdi_id;
1008

    
1009
    ret = 0;
1010
out:
1011
    closesocket(fd);
1012
    return ret;
1013
}
1014

    
1015
static int coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
1016
                           struct iovec *iov, int niov, bool create,
1017
                           enum AIOCBState aiocb_type)
1018
{
1019
    int nr_copies = s->inode.nr_copies;
1020
    SheepdogObjReq hdr;
1021
    unsigned int wlen = 0;
1022
    int ret;
1023
    uint64_t oid = aio_req->oid;
1024
    unsigned int datalen = aio_req->data_len;
1025
    uint64_t offset = aio_req->offset;
1026
    uint8_t flags = aio_req->flags;
1027
    uint64_t old_oid = aio_req->base_oid;
1028

    
1029
    if (!nr_copies) {
1030
        error_report("bug");
1031
    }
1032

    
1033
    memset(&hdr, 0, sizeof(hdr));
1034

    
1035
    switch (aiocb_type) {
1036
    case AIOCB_FLUSH_CACHE:
1037
        hdr.opcode = SD_OP_FLUSH_VDI;
1038
        break;
1039
    case AIOCB_READ_UDATA:
1040
        hdr.opcode = SD_OP_READ_OBJ;
1041
        hdr.flags = flags;
1042
        break;
1043
    case AIOCB_WRITE_UDATA:
1044
        if (create) {
1045
            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1046
        } else {
1047
            hdr.opcode = SD_OP_WRITE_OBJ;
1048
        }
1049
        wlen = datalen;
1050
        hdr.flags = SD_FLAG_CMD_WRITE | flags;
1051
        break;
1052
    case AIOCB_DISCARD_OBJ:
1053
        hdr.opcode = SD_OP_DISCARD_OBJ;
1054
        break;
1055
    }
1056

    
1057
    if (s->cache_flags) {
1058
        hdr.flags |= s->cache_flags;
1059
    }
1060

    
1061
    hdr.oid = oid;
1062
    hdr.cow_oid = old_oid;
1063
    hdr.copies = s->inode.nr_copies;
1064

    
1065
    hdr.data_length = datalen;
1066
    hdr.offset = offset;
1067

    
1068
    hdr.id = aio_req->id;
1069

    
1070
    qemu_co_mutex_lock(&s->lock);
1071
    s->co_send = qemu_coroutine_self();
1072
    qemu_aio_set_fd_handler(s->fd, co_read_response, co_write_request,
1073
                            aio_flush_request, s);
1074
    socket_set_cork(s->fd, 1);
1075

    
1076
    /* send a header */
1077
    ret = qemu_co_send(s->fd, &hdr, sizeof(hdr));
1078
    if (ret < 0) {
1079
        qemu_co_mutex_unlock(&s->lock);
1080
        error_report("failed to send a req, %s", strerror(errno));
1081
        return -errno;
1082
    }
1083

    
1084
    if (wlen) {
1085
        ret = qemu_co_sendv(s->fd, iov, niov, aio_req->iov_offset, wlen);
1086
        if (ret < 0) {
1087
            qemu_co_mutex_unlock(&s->lock);
1088
            error_report("failed to send a data, %s", strerror(errno));
1089
            return -errno;
1090
        }
1091
    }
1092

    
1093
    socket_set_cork(s->fd, 0);
1094
    qemu_aio_set_fd_handler(s->fd, co_read_response, NULL,
1095
                            aio_flush_request, s);
1096
    qemu_co_mutex_unlock(&s->lock);
1097

    
1098
    return 0;
1099
}
1100

    
1101
static int read_write_object(int fd, char *buf, uint64_t oid, int copies,
1102
                             unsigned int datalen, uint64_t offset,
1103
                             bool write, bool create, uint32_t cache_flags)
1104
{
1105
    SheepdogObjReq hdr;
1106
    SheepdogObjRsp *rsp = (SheepdogObjRsp *)&hdr;
1107
    unsigned int wlen, rlen;
1108
    int ret;
1109

    
1110
    memset(&hdr, 0, sizeof(hdr));
1111

    
1112
    if (write) {
1113
        wlen = datalen;
1114
        rlen = 0;
1115
        hdr.flags = SD_FLAG_CMD_WRITE;
1116
        if (create) {
1117
            hdr.opcode = SD_OP_CREATE_AND_WRITE_OBJ;
1118
        } else {
1119
            hdr.opcode = SD_OP_WRITE_OBJ;
1120
        }
1121
    } else {
1122
        wlen = 0;
1123
        rlen = datalen;
1124
        hdr.opcode = SD_OP_READ_OBJ;
1125
    }
1126

    
1127
    hdr.flags |= cache_flags;
1128

    
1129
    hdr.oid = oid;
1130
    hdr.data_length = datalen;
1131
    hdr.offset = offset;
1132
    hdr.copies = copies;
1133

    
1134
    ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1135
    if (ret) {
1136
        error_report("failed to send a request to the sheep");
1137
        return ret;
1138
    }
1139

    
1140
    switch (rsp->result) {
1141
    case SD_RES_SUCCESS:
1142
        return 0;
1143
    default:
1144
        error_report("%s", sd_strerror(rsp->result));
1145
        return -EIO;
1146
    }
1147
}
1148

    
1149
static int read_object(int fd, char *buf, uint64_t oid, int copies,
1150
                       unsigned int datalen, uint64_t offset,
1151
                       uint32_t cache_flags)
1152
{
1153
    return read_write_object(fd, buf, oid, copies, datalen, offset, false,
1154
                             false, cache_flags);
1155
}
1156

    
1157
static int write_object(int fd, char *buf, uint64_t oid, int copies,
1158
                        unsigned int datalen, uint64_t offset, bool create,
1159
                        uint32_t cache_flags)
1160
{
1161
    return read_write_object(fd, buf, oid, copies, datalen, offset, true,
1162
                             create, cache_flags);
1163
}
1164

    
1165
/* update inode with the latest state */
1166
static int reload_inode(BDRVSheepdogState *s, uint32_t snapid, const char *tag)
1167
{
1168
    SheepdogInode *inode;
1169
    int ret = 0, fd;
1170
    uint32_t vid = 0;
1171

    
1172
    fd = connect_to_sdog(s);
1173
    if (fd < 0) {
1174
        return -EIO;
1175
    }
1176

    
1177
    inode = g_malloc(sizeof(s->inode));
1178

    
1179
    ret = find_vdi_name(s, s->name, snapid, tag, &vid, false);
1180
    if (ret) {
1181
        goto out;
1182
    }
1183

    
1184
    ret = read_object(fd, (char *)inode, vid_to_vdi_oid(vid),
1185
                      s->inode.nr_copies, sizeof(*inode), 0, s->cache_flags);
1186
    if (ret < 0) {
1187
        goto out;
1188
    }
1189

    
1190
    if (inode->vdi_id != s->inode.vdi_id) {
1191
        memcpy(&s->inode, inode, sizeof(s->inode));
1192
    }
1193

    
1194
out:
1195
    g_free(inode);
1196
    closesocket(fd);
1197

    
1198
    return ret;
1199
}
1200

    
1201
static int coroutine_fn resend_aioreq(BDRVSheepdogState *s, AIOReq *aio_req)
1202
{
1203
    SheepdogAIOCB *acb = aio_req->aiocb;
1204
    bool create = false;
1205
    int ret;
1206

    
1207
    ret = reload_inode(s, 0, "");
1208
    if (ret < 0) {
1209
        return ret;
1210
    }
1211

    
1212
    aio_req->oid = vid_to_data_oid(s->inode.vdi_id,
1213
                                   data_oid_to_idx(aio_req->oid));
1214

    
1215
    /* check whether this request becomes a CoW one */
1216
    if (acb->aiocb_type == AIOCB_WRITE_UDATA) {
1217
        int idx = data_oid_to_idx(aio_req->oid);
1218
        AIOReq *areq;
1219

    
1220
        if (s->inode.data_vdi_id[idx] == 0) {
1221
            create = true;
1222
            goto out;
1223
        }
1224
        if (is_data_obj_writable(&s->inode, idx)) {
1225
            goto out;
1226
        }
1227

    
1228
        /* link to the pending list if there is another CoW request to
1229
         * the same object */
1230
        QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
1231
            if (areq != aio_req && areq->oid == aio_req->oid) {
1232
                dprintf("simultaneous CoW to %" PRIx64 "\n", aio_req->oid);
1233
                QLIST_REMOVE(aio_req, aio_siblings);
1234
                QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req, aio_siblings);
1235
                return SD_RES_SUCCESS;
1236
            }
1237
        }
1238

    
1239
        aio_req->base_oid = vid_to_data_oid(s->inode.data_vdi_id[idx], idx);
1240
        aio_req->flags |= SD_FLAG_CMD_COW;
1241
        create = true;
1242
    }
1243
out:
1244
    return add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1245
                           create, acb->aiocb_type);
1246
}
1247

    
1248
/* TODO Convert to fine grained options */
1249
static QemuOptsList runtime_opts = {
1250
    .name = "sheepdog",
1251
    .head = QTAILQ_HEAD_INITIALIZER(runtime_opts.head),
1252
    .desc = {
1253
        {
1254
            .name = "filename",
1255
            .type = QEMU_OPT_STRING,
1256
            .help = "URL to the sheepdog image",
1257
        },
1258
        { /* end of list */ }
1259
    },
1260
};
1261

    
1262
static int sd_open(BlockDriverState *bs, QDict *options, int flags)
1263
{
1264
    int ret, fd;
1265
    uint32_t vid = 0;
1266
    BDRVSheepdogState *s = bs->opaque;
1267
    char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1268
    uint32_t snapid;
1269
    char *buf = NULL;
1270
    QemuOpts *opts;
1271
    Error *local_err = NULL;
1272
    const char *filename;
1273

    
1274
    opts = qemu_opts_create_nofail(&runtime_opts);
1275
    qemu_opts_absorb_qdict(opts, options, &local_err);
1276
    if (error_is_set(&local_err)) {
1277
        qerror_report_err(local_err);
1278
        error_free(local_err);
1279
        ret = -EINVAL;
1280
        goto out;
1281
    }
1282

    
1283
    filename = qemu_opt_get(opts, "filename");
1284

    
1285
    QLIST_INIT(&s->inflight_aio_head);
1286
    QLIST_INIT(&s->pending_aio_head);
1287
    s->fd = -1;
1288

    
1289
    memset(vdi, 0, sizeof(vdi));
1290
    memset(tag, 0, sizeof(tag));
1291

    
1292
    if (strstr(filename, "://")) {
1293
        ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1294
    } else {
1295
        ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1296
    }
1297
    if (ret < 0) {
1298
        goto out;
1299
    }
1300
    s->fd = get_sheep_fd(s);
1301
    if (s->fd < 0) {
1302
        ret = s->fd;
1303
        goto out;
1304
    }
1305

    
1306
    ret = find_vdi_name(s, vdi, snapid, tag, &vid, true);
1307
    if (ret) {
1308
        goto out;
1309
    }
1310

    
1311
    /*
1312
     * QEMU block layer emulates writethrough cache as 'writeback + flush', so
1313
     * we always set SD_FLAG_CMD_CACHE (writeback cache) as default.
1314
     */
1315
    s->cache_flags = SD_FLAG_CMD_CACHE;
1316
    if (flags & BDRV_O_NOCACHE) {
1317
        s->cache_flags = SD_FLAG_CMD_DIRECT;
1318
    }
1319
    s->discard_supported = true;
1320

    
1321
    if (snapid || tag[0] != '\0') {
1322
        dprintf("%" PRIx32 " snapshot inode was open.\n", vid);
1323
        s->is_snapshot = true;
1324
    }
1325

    
1326
    fd = connect_to_sdog(s);
1327
    if (fd < 0) {
1328
        ret = fd;
1329
        goto out;
1330
    }
1331

    
1332
    buf = g_malloc(SD_INODE_SIZE);
1333
    ret = read_object(fd, buf, vid_to_vdi_oid(vid), 0, SD_INODE_SIZE, 0,
1334
                      s->cache_flags);
1335

    
1336
    closesocket(fd);
1337

    
1338
    if (ret) {
1339
        goto out;
1340
    }
1341

    
1342
    memcpy(&s->inode, buf, sizeof(s->inode));
1343
    s->min_dirty_data_idx = UINT32_MAX;
1344
    s->max_dirty_data_idx = 0;
1345

    
1346
    bs->total_sectors = s->inode.vdi_size / BDRV_SECTOR_SIZE;
1347
    pstrcpy(s->name, sizeof(s->name), vdi);
1348
    qemu_co_mutex_init(&s->lock);
1349
    qemu_opts_del(opts);
1350
    g_free(buf);
1351
    return 0;
1352
out:
1353
    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL);
1354
    if (s->fd >= 0) {
1355
        closesocket(s->fd);
1356
    }
1357
    qemu_opts_del(opts);
1358
    g_free(buf);
1359
    return ret;
1360
}
1361

    
1362
static int do_sd_create(BDRVSheepdogState *s, char *filename, int64_t vdi_size,
1363
                        uint32_t base_vid, uint32_t *vdi_id, int snapshot)
1364
{
1365
    SheepdogVdiReq hdr;
1366
    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1367
    int fd, ret;
1368
    unsigned int wlen, rlen = 0;
1369
    char buf[SD_MAX_VDI_LEN];
1370

    
1371
    fd = connect_to_sdog(s);
1372
    if (fd < 0) {
1373
        return fd;
1374
    }
1375

    
1376
    /* FIXME: would it be better to fail (e.g., return -EIO) when filename
1377
     * does not fit in buf?  For now, just truncate and avoid buffer overrun.
1378
     */
1379
    memset(buf, 0, sizeof(buf));
1380
    pstrcpy(buf, sizeof(buf), filename);
1381

    
1382
    memset(&hdr, 0, sizeof(hdr));
1383
    hdr.opcode = SD_OP_NEW_VDI;
1384
    hdr.vdi_id = base_vid;
1385

    
1386
    wlen = SD_MAX_VDI_LEN;
1387

    
1388
    hdr.flags = SD_FLAG_CMD_WRITE;
1389
    hdr.snapid = snapshot;
1390

    
1391
    hdr.data_length = wlen;
1392
    hdr.vdi_size = vdi_size;
1393

    
1394
    ret = do_req(fd, (SheepdogReq *)&hdr, buf, &wlen, &rlen);
1395

    
1396
    closesocket(fd);
1397

    
1398
    if (ret) {
1399
        return ret;
1400
    }
1401

    
1402
    if (rsp->result != SD_RES_SUCCESS) {
1403
        error_report("%s, %s", sd_strerror(rsp->result), filename);
1404
        return -EIO;
1405
    }
1406

    
1407
    if (vdi_id) {
1408
        *vdi_id = rsp->vdi_id;
1409
    }
1410

    
1411
    return 0;
1412
}
1413

    
1414
static int sd_prealloc(const char *filename)
1415
{
1416
    BlockDriverState *bs = NULL;
1417
    uint32_t idx, max_idx;
1418
    int64_t vdi_size;
1419
    void *buf = g_malloc0(SD_DATA_OBJ_SIZE);
1420
    int ret;
1421

    
1422
    ret = bdrv_file_open(&bs, filename, NULL, BDRV_O_RDWR);
1423
    if (ret < 0) {
1424
        goto out;
1425
    }
1426

    
1427
    vdi_size = bdrv_getlength(bs);
1428
    if (vdi_size < 0) {
1429
        ret = vdi_size;
1430
        goto out;
1431
    }
1432
    max_idx = DIV_ROUND_UP(vdi_size, SD_DATA_OBJ_SIZE);
1433

    
1434
    for (idx = 0; idx < max_idx; idx++) {
1435
        /*
1436
         * The created image can be a cloned image, so we need to read
1437
         * a data from the source image.
1438
         */
1439
        ret = bdrv_pread(bs, idx * SD_DATA_OBJ_SIZE, buf, SD_DATA_OBJ_SIZE);
1440
        if (ret < 0) {
1441
            goto out;
1442
        }
1443
        ret = bdrv_pwrite(bs, idx * SD_DATA_OBJ_SIZE, buf, SD_DATA_OBJ_SIZE);
1444
        if (ret < 0) {
1445
            goto out;
1446
        }
1447
    }
1448
out:
1449
    if (bs) {
1450
        bdrv_delete(bs);
1451
    }
1452
    g_free(buf);
1453

    
1454
    return ret;
1455
}
1456

    
1457
static int sd_create(const char *filename, QEMUOptionParameter *options)
1458
{
1459
    int ret = 0;
1460
    uint32_t vid = 0, base_vid = 0;
1461
    int64_t vdi_size = 0;
1462
    char *backing_file = NULL;
1463
    BDRVSheepdogState *s;
1464
    char vdi[SD_MAX_VDI_LEN], tag[SD_MAX_VDI_TAG_LEN];
1465
    uint32_t snapid;
1466
    bool prealloc = false;
1467

    
1468
    s = g_malloc0(sizeof(BDRVSheepdogState));
1469

    
1470
    memset(vdi, 0, sizeof(vdi));
1471
    memset(tag, 0, sizeof(tag));
1472
    if (strstr(filename, "://")) {
1473
        ret = sd_parse_uri(s, filename, vdi, &snapid, tag);
1474
    } else {
1475
        ret = parse_vdiname(s, filename, vdi, &snapid, tag);
1476
    }
1477
    if (ret < 0) {
1478
        goto out;
1479
    }
1480

    
1481
    while (options && options->name) {
1482
        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
1483
            vdi_size = options->value.n;
1484
        } else if (!strcmp(options->name, BLOCK_OPT_BACKING_FILE)) {
1485
            backing_file = options->value.s;
1486
        } else if (!strcmp(options->name, BLOCK_OPT_PREALLOC)) {
1487
            if (!options->value.s || !strcmp(options->value.s, "off")) {
1488
                prealloc = false;
1489
            } else if (!strcmp(options->value.s, "full")) {
1490
                prealloc = true;
1491
            } else {
1492
                error_report("Invalid preallocation mode: '%s'",
1493
                             options->value.s);
1494
                ret = -EINVAL;
1495
                goto out;
1496
            }
1497
        }
1498
        options++;
1499
    }
1500

    
1501
    if (vdi_size > SD_MAX_VDI_SIZE) {
1502
        error_report("too big image size");
1503
        ret = -EINVAL;
1504
        goto out;
1505
    }
1506

    
1507
    if (backing_file) {
1508
        BlockDriverState *bs;
1509
        BDRVSheepdogState *s;
1510
        BlockDriver *drv;
1511

    
1512
        /* Currently, only Sheepdog backing image is supported. */
1513
        drv = bdrv_find_protocol(backing_file);
1514
        if (!drv || strcmp(drv->protocol_name, "sheepdog") != 0) {
1515
            error_report("backing_file must be a sheepdog image");
1516
            ret = -EINVAL;
1517
            goto out;
1518
        }
1519

    
1520
        ret = bdrv_file_open(&bs, backing_file, NULL, 0);
1521
        if (ret < 0) {
1522
            goto out;
1523
        }
1524

    
1525
        s = bs->opaque;
1526

    
1527
        if (!is_snapshot(&s->inode)) {
1528
            error_report("cannot clone from a non snapshot vdi");
1529
            bdrv_delete(bs);
1530
            ret = -EINVAL;
1531
            goto out;
1532
        }
1533

    
1534
        base_vid = s->inode.vdi_id;
1535
        bdrv_delete(bs);
1536
    }
1537

    
1538
    ret = do_sd_create(s, vdi, vdi_size, base_vid, &vid, 0);
1539
    if (!prealloc || ret) {
1540
        goto out;
1541
    }
1542

    
1543
    ret = sd_prealloc(filename);
1544
out:
1545
    g_free(s);
1546
    return ret;
1547
}
1548

    
1549
static void sd_close(BlockDriverState *bs)
1550
{
1551
    BDRVSheepdogState *s = bs->opaque;
1552
    SheepdogVdiReq hdr;
1553
    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1554
    unsigned int wlen, rlen = 0;
1555
    int fd, ret;
1556

    
1557
    dprintf("%s\n", s->name);
1558

    
1559
    fd = connect_to_sdog(s);
1560
    if (fd < 0) {
1561
        return;
1562
    }
1563

    
1564
    memset(&hdr, 0, sizeof(hdr));
1565

    
1566
    hdr.opcode = SD_OP_RELEASE_VDI;
1567
    hdr.vdi_id = s->inode.vdi_id;
1568
    wlen = strlen(s->name) + 1;
1569
    hdr.data_length = wlen;
1570
    hdr.flags = SD_FLAG_CMD_WRITE;
1571

    
1572
    ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen);
1573

    
1574
    closesocket(fd);
1575

    
1576
    if (!ret && rsp->result != SD_RES_SUCCESS &&
1577
        rsp->result != SD_RES_VDI_NOT_LOCKED) {
1578
        error_report("%s, %s", sd_strerror(rsp->result), s->name);
1579
    }
1580

    
1581
    qemu_aio_set_fd_handler(s->fd, NULL, NULL, NULL, NULL);
1582
    closesocket(s->fd);
1583
    g_free(s->host_spec);
1584
}
1585

    
1586
static int64_t sd_getlength(BlockDriverState *bs)
1587
{
1588
    BDRVSheepdogState *s = bs->opaque;
1589

    
1590
    return s->inode.vdi_size;
1591
}
1592

    
1593
static int sd_truncate(BlockDriverState *bs, int64_t offset)
1594
{
1595
    BDRVSheepdogState *s = bs->opaque;
1596
    int ret, fd;
1597
    unsigned int datalen;
1598

    
1599
    if (offset < s->inode.vdi_size) {
1600
        error_report("shrinking is not supported");
1601
        return -EINVAL;
1602
    } else if (offset > SD_MAX_VDI_SIZE) {
1603
        error_report("too big image size");
1604
        return -EINVAL;
1605
    }
1606

    
1607
    fd = connect_to_sdog(s);
1608
    if (fd < 0) {
1609
        return fd;
1610
    }
1611

    
1612
    /* we don't need to update entire object */
1613
    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
1614
    s->inode.vdi_size = offset;
1615
    ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
1616
                       s->inode.nr_copies, datalen, 0, false, s->cache_flags);
1617
    close(fd);
1618

    
1619
    if (ret < 0) {
1620
        error_report("failed to update an inode.");
1621
    }
1622

    
1623
    return ret;
1624
}
1625

    
1626
/*
1627
 * This function is called after writing data objects.  If we need to
1628
 * update metadata, this sends a write request to the vdi object.
1629
 * Otherwise, this switches back to sd_co_readv/writev.
1630
 */
1631
static void coroutine_fn sd_write_done(SheepdogAIOCB *acb)
1632
{
1633
    int ret;
1634
    BDRVSheepdogState *s = acb->common.bs->opaque;
1635
    struct iovec iov;
1636
    AIOReq *aio_req;
1637
    uint32_t offset, data_len, mn, mx;
1638

    
1639
    mn = s->min_dirty_data_idx;
1640
    mx = s->max_dirty_data_idx;
1641
    if (mn <= mx) {
1642
        /* we need to update the vdi object. */
1643
        offset = sizeof(s->inode) - sizeof(s->inode.data_vdi_id) +
1644
            mn * sizeof(s->inode.data_vdi_id[0]);
1645
        data_len = (mx - mn + 1) * sizeof(s->inode.data_vdi_id[0]);
1646

    
1647
        s->min_dirty_data_idx = UINT32_MAX;
1648
        s->max_dirty_data_idx = 0;
1649

    
1650
        iov.iov_base = &s->inode;
1651
        iov.iov_len = sizeof(s->inode);
1652
        aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
1653
                                data_len, offset, 0, 0, offset);
1654
        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1655
        ret = add_aio_request(s, aio_req, &iov, 1, false, AIOCB_WRITE_UDATA);
1656
        if (ret) {
1657
            free_aio_req(s, aio_req);
1658
            acb->ret = -EIO;
1659
            goto out;
1660
        }
1661

    
1662
        acb->aio_done_func = sd_finish_aiocb;
1663
        acb->aiocb_type = AIOCB_WRITE_UDATA;
1664
        return;
1665
    }
1666
out:
1667
    sd_finish_aiocb(acb);
1668
}
1669

    
1670
/* Delete current working VDI on the snapshot chain */
1671
static bool sd_delete(BDRVSheepdogState *s)
1672
{
1673
    unsigned int wlen = SD_MAX_VDI_LEN, rlen = 0;
1674
    SheepdogVdiReq hdr = {
1675
        .opcode = SD_OP_DEL_VDI,
1676
        .vdi_id = s->inode.vdi_id,
1677
        .data_length = wlen,
1678
        .flags = SD_FLAG_CMD_WRITE,
1679
    };
1680
    SheepdogVdiRsp *rsp = (SheepdogVdiRsp *)&hdr;
1681
    int fd, ret;
1682

    
1683
    fd = connect_to_sdog(s);
1684
    if (fd < 0) {
1685
        return false;
1686
    }
1687

    
1688
    ret = do_req(fd, (SheepdogReq *)&hdr, s->name, &wlen, &rlen);
1689
    closesocket(fd);
1690
    if (ret) {
1691
        return false;
1692
    }
1693
    switch (rsp->result) {
1694
    case SD_RES_NO_VDI:
1695
        error_report("%s was already deleted", s->name);
1696
        /* fall through */
1697
    case SD_RES_SUCCESS:
1698
        break;
1699
    default:
1700
        error_report("%s, %s", sd_strerror(rsp->result), s->name);
1701
        return false;
1702
    }
1703

    
1704
    return true;
1705
}
1706

    
1707
/*
1708
 * Create a writable VDI from a snapshot
1709
 */
1710
static int sd_create_branch(BDRVSheepdogState *s)
1711
{
1712
    int ret, fd;
1713
    uint32_t vid;
1714
    char *buf;
1715
    bool deleted;
1716

    
1717
    dprintf("%" PRIx32 " is snapshot.\n", s->inode.vdi_id);
1718

    
1719
    buf = g_malloc(SD_INODE_SIZE);
1720

    
1721
    /*
1722
     * Even If deletion fails, we will just create extra snapshot based on
1723
     * the workding VDI which was supposed to be deleted. So no need to
1724
     * false bail out.
1725
     */
1726
    deleted = sd_delete(s);
1727
    ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &vid,
1728
                       !deleted);
1729
    if (ret) {
1730
        goto out;
1731
    }
1732

    
1733
    dprintf("%" PRIx32 " is created.\n", vid);
1734

    
1735
    fd = connect_to_sdog(s);
1736
    if (fd < 0) {
1737
        ret = fd;
1738
        goto out;
1739
    }
1740

    
1741
    ret = read_object(fd, buf, vid_to_vdi_oid(vid), s->inode.nr_copies,
1742
                      SD_INODE_SIZE, 0, s->cache_flags);
1743

    
1744
    closesocket(fd);
1745

    
1746
    if (ret < 0) {
1747
        goto out;
1748
    }
1749

    
1750
    memcpy(&s->inode, buf, sizeof(s->inode));
1751

    
1752
    s->is_snapshot = false;
1753
    ret = 0;
1754
    dprintf("%" PRIx32 " was newly created.\n", s->inode.vdi_id);
1755

    
1756
out:
1757
    g_free(buf);
1758

    
1759
    return ret;
1760
}
1761

    
1762
/*
1763
 * Send I/O requests to the server.
1764
 *
1765
 * This function sends requests to the server, links the requests to
1766
 * the inflight_list in BDRVSheepdogState, and exits without
1767
 * waiting the response.  The responses are received in the
1768
 * `aio_read_response' function which is called from the main loop as
1769
 * a fd handler.
1770
 *
1771
 * Returns 1 when we need to wait a response, 0 when there is no sent
1772
 * request and -errno in error cases.
1773
 */
1774
static int coroutine_fn sd_co_rw_vector(void *p)
1775
{
1776
    SheepdogAIOCB *acb = p;
1777
    int ret = 0;
1778
    unsigned long len, done = 0, total = acb->nb_sectors * BDRV_SECTOR_SIZE;
1779
    unsigned long idx = acb->sector_num * BDRV_SECTOR_SIZE / SD_DATA_OBJ_SIZE;
1780
    uint64_t oid;
1781
    uint64_t offset = (acb->sector_num * BDRV_SECTOR_SIZE) % SD_DATA_OBJ_SIZE;
1782
    BDRVSheepdogState *s = acb->common.bs->opaque;
1783
    SheepdogInode *inode = &s->inode;
1784
    AIOReq *aio_req;
1785

    
1786
    if (acb->aiocb_type == AIOCB_WRITE_UDATA && s->is_snapshot) {
1787
        /*
1788
         * In the case we open the snapshot VDI, Sheepdog creates the
1789
         * writable VDI when we do a write operation first.
1790
         */
1791
        ret = sd_create_branch(s);
1792
        if (ret) {
1793
            acb->ret = -EIO;
1794
            goto out;
1795
        }
1796
    }
1797

    
1798
    /*
1799
     * Make sure we don't free the aiocb before we are done with all requests.
1800
     * This additional reference is dropped at the end of this function.
1801
     */
1802
    acb->nr_pending++;
1803

    
1804
    while (done != total) {
1805
        uint8_t flags = 0;
1806
        uint64_t old_oid = 0;
1807
        bool create = false;
1808

    
1809
        oid = vid_to_data_oid(inode->data_vdi_id[idx], idx);
1810

    
1811
        len = MIN(total - done, SD_DATA_OBJ_SIZE - offset);
1812

    
1813
        switch (acb->aiocb_type) {
1814
        case AIOCB_READ_UDATA:
1815
            if (!inode->data_vdi_id[idx]) {
1816
                qemu_iovec_memset(acb->qiov, done, 0, len);
1817
                goto done;
1818
            }
1819
            break;
1820
        case AIOCB_WRITE_UDATA:
1821
            if (!inode->data_vdi_id[idx]) {
1822
                create = true;
1823
            } else if (!is_data_obj_writable(inode, idx)) {
1824
                /* Copy-On-Write */
1825
                create = true;
1826
                old_oid = oid;
1827
                flags = SD_FLAG_CMD_COW;
1828
            }
1829
            break;
1830
        case AIOCB_DISCARD_OBJ:
1831
            /*
1832
             * We discard the object only when the whole object is
1833
             * 1) allocated 2) trimmed. Otherwise, simply skip it.
1834
             */
1835
            if (len != SD_DATA_OBJ_SIZE || inode->data_vdi_id[idx] == 0) {
1836
                goto done;
1837
            }
1838
            break;
1839
        default:
1840
            break;
1841
        }
1842

    
1843
        if (create) {
1844
            dprintf("update ino (%" PRIu32 ") %" PRIu64 " %" PRIu64 " %ld\n",
1845
                    inode->vdi_id, oid,
1846
                    vid_to_data_oid(inode->data_vdi_id[idx], idx), idx);
1847
            oid = vid_to_data_oid(inode->vdi_id, idx);
1848
            dprintf("new oid %" PRIx64 "\n", oid);
1849
        }
1850

    
1851
        aio_req = alloc_aio_req(s, acb, oid, len, offset, flags, old_oid, done);
1852

    
1853
        if (create) {
1854
            AIOReq *areq;
1855
            QLIST_FOREACH(areq, &s->inflight_aio_head, aio_siblings) {
1856
                if (areq->oid == oid) {
1857
                    /*
1858
                     * Sheepdog cannot handle simultaneous create
1859
                     * requests to the same object.  So we cannot send
1860
                     * the request until the previous request
1861
                     * finishes.
1862
                     */
1863
                    aio_req->flags = 0;
1864
                    aio_req->base_oid = 0;
1865
                    QLIST_INSERT_HEAD(&s->pending_aio_head, aio_req,
1866
                                      aio_siblings);
1867
                    goto done;
1868
                }
1869
            }
1870
        }
1871

    
1872
        QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1873
        ret = add_aio_request(s, aio_req, acb->qiov->iov, acb->qiov->niov,
1874
                              create, acb->aiocb_type);
1875
        if (ret < 0) {
1876
            error_report("add_aio_request is failed");
1877
            free_aio_req(s, aio_req);
1878
            acb->ret = -EIO;
1879
            goto out;
1880
        }
1881
    done:
1882
        offset = 0;
1883
        idx++;
1884
        done += len;
1885
    }
1886
out:
1887
    if (!--acb->nr_pending) {
1888
        return acb->ret;
1889
    }
1890
    return 1;
1891
}
1892

    
1893
static coroutine_fn int sd_co_writev(BlockDriverState *bs, int64_t sector_num,
1894
                        int nb_sectors, QEMUIOVector *qiov)
1895
{
1896
    SheepdogAIOCB *acb;
1897
    int ret;
1898

    
1899
    if (bs->growable && sector_num + nb_sectors > bs->total_sectors) {
1900
        ret = sd_truncate(bs, (sector_num + nb_sectors) * BDRV_SECTOR_SIZE);
1901
        if (ret < 0) {
1902
            return ret;
1903
        }
1904
        bs->total_sectors = sector_num + nb_sectors;
1905
    }
1906

    
1907
    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
1908
    acb->aio_done_func = sd_write_done;
1909
    acb->aiocb_type = AIOCB_WRITE_UDATA;
1910

    
1911
    ret = sd_co_rw_vector(acb);
1912
    if (ret <= 0) {
1913
        qemu_aio_release(acb);
1914
        return ret;
1915
    }
1916

    
1917
    qemu_coroutine_yield();
1918

    
1919
    return acb->ret;
1920
}
1921

    
1922
static coroutine_fn int sd_co_readv(BlockDriverState *bs, int64_t sector_num,
1923
                       int nb_sectors, QEMUIOVector *qiov)
1924
{
1925
    SheepdogAIOCB *acb;
1926
    int ret;
1927

    
1928
    acb = sd_aio_setup(bs, qiov, sector_num, nb_sectors);
1929
    acb->aiocb_type = AIOCB_READ_UDATA;
1930
    acb->aio_done_func = sd_finish_aiocb;
1931

    
1932
    ret = sd_co_rw_vector(acb);
1933
    if (ret <= 0) {
1934
        qemu_aio_release(acb);
1935
        return ret;
1936
    }
1937

    
1938
    qemu_coroutine_yield();
1939

    
1940
    return acb->ret;
1941
}
1942

    
1943
static int coroutine_fn sd_co_flush_to_disk(BlockDriverState *bs)
1944
{
1945
    BDRVSheepdogState *s = bs->opaque;
1946
    SheepdogAIOCB *acb;
1947
    AIOReq *aio_req;
1948
    int ret;
1949

    
1950
    if (s->cache_flags != SD_FLAG_CMD_CACHE) {
1951
        return 0;
1952
    }
1953

    
1954
    acb = sd_aio_setup(bs, NULL, 0, 0);
1955
    acb->aiocb_type = AIOCB_FLUSH_CACHE;
1956
    acb->aio_done_func = sd_finish_aiocb;
1957

    
1958
    aio_req = alloc_aio_req(s, acb, vid_to_vdi_oid(s->inode.vdi_id),
1959
                            0, 0, 0, 0, 0);
1960
    QLIST_INSERT_HEAD(&s->inflight_aio_head, aio_req, aio_siblings);
1961
    ret = add_aio_request(s, aio_req, NULL, 0, false, acb->aiocb_type);
1962
    if (ret < 0) {
1963
        error_report("add_aio_request is failed");
1964
        free_aio_req(s, aio_req);
1965
        qemu_aio_release(acb);
1966
        return ret;
1967
    }
1968

    
1969
    qemu_coroutine_yield();
1970
    return acb->ret;
1971
}
1972

    
1973
static int sd_snapshot_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
1974
{
1975
    BDRVSheepdogState *s = bs->opaque;
1976
    int ret, fd;
1977
    uint32_t new_vid;
1978
    SheepdogInode *inode;
1979
    unsigned int datalen;
1980

    
1981
    dprintf("sn_info: name %s id_str %s s: name %s vm_state_size %" PRId64 " "
1982
            "is_snapshot %d\n", sn_info->name, sn_info->id_str,
1983
            s->name, sn_info->vm_state_size, s->is_snapshot);
1984

    
1985
    if (s->is_snapshot) {
1986
        error_report("You can't create a snapshot of a snapshot VDI, "
1987
                     "%s (%" PRIu32 ").", s->name, s->inode.vdi_id);
1988

    
1989
        return -EINVAL;
1990
    }
1991

    
1992
    dprintf("%s %s\n", sn_info->name, sn_info->id_str);
1993

    
1994
    s->inode.vm_state_size = sn_info->vm_state_size;
1995
    s->inode.vm_clock_nsec = sn_info->vm_clock_nsec;
1996
    /* It appears that inode.tag does not require a NUL terminator,
1997
     * which means this use of strncpy is ok.
1998
     */
1999
    strncpy(s->inode.tag, sn_info->name, sizeof(s->inode.tag));
2000
    /* we don't need to update entire object */
2001
    datalen = SD_INODE_SIZE - sizeof(s->inode.data_vdi_id);
2002

    
2003
    /* refresh inode. */
2004
    fd = connect_to_sdog(s);
2005
    if (fd < 0) {
2006
        ret = fd;
2007
        goto cleanup;
2008
    }
2009

    
2010
    ret = write_object(fd, (char *)&s->inode, vid_to_vdi_oid(s->inode.vdi_id),
2011
                       s->inode.nr_copies, datalen, 0, false, s->cache_flags);
2012
    if (ret < 0) {
2013
        error_report("failed to write snapshot's inode.");
2014
        goto cleanup;
2015
    }
2016

    
2017
    ret = do_sd_create(s, s->name, s->inode.vdi_size, s->inode.vdi_id, &new_vid,
2018
                       1);
2019
    if (ret < 0) {
2020
        error_report("failed to create inode for snapshot. %s",
2021
                     strerror(errno));
2022
        goto cleanup;
2023
    }
2024

    
2025
    inode = (SheepdogInode *)g_malloc(datalen);
2026

    
2027
    ret = read_object(fd, (char *)inode, vid_to_vdi_oid(new_vid),
2028
                      s->inode.nr_copies, datalen, 0, s->cache_flags);
2029

    
2030
    if (ret < 0) {
2031
        error_report("failed to read new inode info. %s", strerror(errno));
2032
        goto cleanup;
2033
    }
2034

    
2035
    memcpy(&s->inode, inode, datalen);
2036
    dprintf("s->inode: name %s snap_id %x oid %x\n",
2037
            s->inode.name, s->inode.snap_id, s->inode.vdi_id);
2038

    
2039
cleanup:
2040
    closesocket(fd);
2041
    return ret;
2042
}
2043

    
2044
/*
2045
 * We implement rollback(loadvm) operation to the specified snapshot by
2046
 * 1) switch to the snapshot
2047
 * 2) rely on sd_create_branch to delete working VDI and
2048
 * 3) create a new working VDI based on the speicified snapshot
2049
 */
2050
static int sd_snapshot_goto(BlockDriverState *bs, const char *snapshot_id)
2051
{
2052
    BDRVSheepdogState *s = bs->opaque;
2053
    BDRVSheepdogState *old_s;
2054
    char tag[SD_MAX_VDI_TAG_LEN];
2055
    uint32_t snapid = 0;
2056
    int ret = 0;
2057

    
2058
    old_s = g_malloc(sizeof(BDRVSheepdogState));
2059

    
2060
    memcpy(old_s, s, sizeof(BDRVSheepdogState));
2061

    
2062
    snapid = strtoul(snapshot_id, NULL, 10);
2063
    if (snapid) {
2064
        tag[0] = 0;
2065
    } else {
2066
        pstrcpy(tag, sizeof(tag), s->name);
2067
    }
2068

    
2069
    ret = reload_inode(s, snapid, tag);
2070
    if (ret) {
2071
        goto out;
2072
    }
2073

    
2074
    if (!s->inode.vm_state_size) {
2075
        error_report("Invalid snapshot");
2076
        ret = -ENOENT;
2077
        goto out;
2078
    }
2079

    
2080
    s->is_snapshot = true;
2081

    
2082
    g_free(old_s);
2083

    
2084
    return 0;
2085
out:
2086
    /* recover bdrv_sd_state */
2087
    memcpy(s, old_s, sizeof(BDRVSheepdogState));
2088
    g_free(old_s);
2089

    
2090
    error_report("failed to open. recover old bdrv_sd_state.");
2091

    
2092
    return ret;
2093
}
2094

    
2095
static int sd_snapshot_delete(BlockDriverState *bs, const char *snapshot_id)
2096
{
2097
    /* FIXME: Delete specified snapshot id.  */
2098
    return 0;
2099
}
2100

    
2101
static int sd_snapshot_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
2102
{
2103
    BDRVSheepdogState *s = bs->opaque;
2104
    SheepdogReq req;
2105
    int fd, nr = 1024, ret, max = BITS_TO_LONGS(SD_NR_VDIS) * sizeof(long);
2106
    QEMUSnapshotInfo *sn_tab = NULL;
2107
    unsigned wlen, rlen;
2108
    int found = 0;
2109
    static SheepdogInode inode;
2110
    unsigned long *vdi_inuse;
2111
    unsigned int start_nr;
2112
    uint64_t hval;
2113
    uint32_t vid;
2114

    
2115
    vdi_inuse = g_malloc(max);
2116

    
2117
    fd = connect_to_sdog(s);
2118
    if (fd < 0) {
2119
        ret = fd;
2120
        goto out;
2121
    }
2122

    
2123
    rlen = max;
2124
    wlen = 0;
2125

    
2126
    memset(&req, 0, sizeof(req));
2127

    
2128
    req.opcode = SD_OP_READ_VDIS;
2129
    req.data_length = max;
2130

    
2131
    ret = do_req(fd, (SheepdogReq *)&req, vdi_inuse, &wlen, &rlen);
2132

    
2133
    closesocket(fd);
2134
    if (ret) {
2135
        goto out;
2136
    }
2137

    
2138
    sn_tab = g_malloc0(nr * sizeof(*sn_tab));
2139

    
2140
    /* calculate a vdi id with hash function */
2141
    hval = fnv_64a_buf(s->name, strlen(s->name), FNV1A_64_INIT);
2142
    start_nr = hval & (SD_NR_VDIS - 1);
2143

    
2144
    fd = connect_to_sdog(s);
2145
    if (fd < 0) {
2146
        ret = fd;
2147
        goto out;
2148
    }
2149

    
2150
    for (vid = start_nr; found < nr; vid = (vid + 1) % SD_NR_VDIS) {
2151
        if (!test_bit(vid, vdi_inuse)) {
2152
            break;
2153
        }
2154

    
2155
        /* we don't need to read entire object */
2156
        ret = read_object(fd, (char *)&inode, vid_to_vdi_oid(vid),
2157
                          0, SD_INODE_SIZE - sizeof(inode.data_vdi_id), 0,
2158
                          s->cache_flags);
2159

    
2160
        if (ret) {
2161
            continue;
2162
        }
2163

    
2164
        if (!strcmp(inode.name, s->name) && is_snapshot(&inode)) {
2165
            sn_tab[found].date_sec = inode.snap_ctime >> 32;
2166
            sn_tab[found].date_nsec = inode.snap_ctime & 0xffffffff;
2167
            sn_tab[found].vm_state_size = inode.vm_state_size;
2168
            sn_tab[found].vm_clock_nsec = inode.vm_clock_nsec;
2169

    
2170
            snprintf(sn_tab[found].id_str, sizeof(sn_tab[found].id_str), "%u",
2171
                     inode.snap_id);
2172
            pstrcpy(sn_tab[found].name,
2173
                    MIN(sizeof(sn_tab[found].name), sizeof(inode.tag)),
2174
                    inode.tag);
2175
            found++;
2176
        }
2177
    }
2178

    
2179
    closesocket(fd);
2180
out:
2181
    *psn_tab = sn_tab;
2182

    
2183
    g_free(vdi_inuse);
2184

    
2185
    if (ret < 0) {
2186
        return ret;
2187
    }
2188

    
2189
    return found;
2190
}
2191

    
2192
static int do_load_save_vmstate(BDRVSheepdogState *s, uint8_t *data,
2193
                                int64_t pos, int size, int load)
2194
{
2195
    bool create;
2196
    int fd, ret = 0, remaining = size;
2197
    unsigned int data_len;
2198
    uint64_t vmstate_oid;
2199
    uint32_t vdi_index;
2200
    uint64_t offset;
2201

    
2202
    fd = connect_to_sdog(s);
2203
    if (fd < 0) {
2204
        return fd;
2205
    }
2206

    
2207
    while (remaining) {
2208
        vdi_index = pos / SD_DATA_OBJ_SIZE;
2209
        offset = pos % SD_DATA_OBJ_SIZE;
2210

    
2211
        data_len = MIN(remaining, SD_DATA_OBJ_SIZE - offset);
2212

    
2213
        vmstate_oid = vid_to_vmstate_oid(s->inode.vdi_id, vdi_index);
2214

    
2215
        create = (offset == 0);
2216
        if (load) {
2217
            ret = read_object(fd, (char *)data, vmstate_oid,
2218
                              s->inode.nr_copies, data_len, offset,
2219
                              s->cache_flags);
2220
        } else {
2221
            ret = write_object(fd, (char *)data, vmstate_oid,
2222
                               s->inode.nr_copies, data_len, offset, create,
2223
                               s->cache_flags);
2224
        }
2225

    
2226
        if (ret < 0) {
2227
            error_report("failed to save vmstate %s", strerror(errno));
2228
            goto cleanup;
2229
        }
2230

    
2231
        pos += data_len;
2232
        data += data_len;
2233
        remaining -= data_len;
2234
    }
2235
    ret = size;
2236
cleanup:
2237
    closesocket(fd);
2238
    return ret;
2239
}
2240

    
2241
static int sd_save_vmstate(BlockDriverState *bs, QEMUIOVector *qiov,
2242
                           int64_t pos)
2243
{
2244
    BDRVSheepdogState *s = bs->opaque;
2245
    void *buf;
2246
    int ret;
2247

    
2248
    buf = qemu_blockalign(bs, qiov->size);
2249
    qemu_iovec_to_buf(qiov, 0, buf, qiov->size);
2250
    ret = do_load_save_vmstate(s, (uint8_t *) buf, pos, qiov->size, 0);
2251
    qemu_vfree(buf);
2252

    
2253
    return ret;
2254
}
2255

    
2256
static int sd_load_vmstate(BlockDriverState *bs, uint8_t *data,
2257
                           int64_t pos, int size)
2258
{
2259
    BDRVSheepdogState *s = bs->opaque;
2260

    
2261
    return do_load_save_vmstate(s, data, pos, size, 1);
2262
}
2263

    
2264

    
2265
static coroutine_fn int sd_co_discard(BlockDriverState *bs, int64_t sector_num,
2266
                                      int nb_sectors)
2267
{
2268
    SheepdogAIOCB *acb;
2269
    QEMUIOVector dummy;
2270
    BDRVSheepdogState *s = bs->opaque;
2271
    int ret;
2272

    
2273
    if (!s->discard_supported) {
2274
            return 0;
2275
    }
2276

    
2277
    acb = sd_aio_setup(bs, &dummy, sector_num, nb_sectors);
2278
    acb->aiocb_type = AIOCB_DISCARD_OBJ;
2279
    acb->aio_done_func = sd_finish_aiocb;
2280

    
2281
    ret = sd_co_rw_vector(acb);
2282
    if (ret <= 0) {
2283
        qemu_aio_release(acb);
2284
        return ret;
2285
    }
2286

    
2287
    qemu_coroutine_yield();
2288

    
2289
    return acb->ret;
2290
}
2291

    
2292
static coroutine_fn int
2293
sd_co_is_allocated(BlockDriverState *bs, int64_t sector_num, int nb_sectors,
2294
                   int *pnum)
2295
{
2296
    BDRVSheepdogState *s = bs->opaque;
2297
    SheepdogInode *inode = &s->inode;
2298
    unsigned long start = sector_num * BDRV_SECTOR_SIZE / SD_DATA_OBJ_SIZE,
2299
                  end = DIV_ROUND_UP((sector_num + nb_sectors) *
2300
                                     BDRV_SECTOR_SIZE, SD_DATA_OBJ_SIZE);
2301
    unsigned long idx;
2302
    int ret = 1;
2303

    
2304
    for (idx = start; idx < end; idx++) {
2305
        if (inode->data_vdi_id[idx] == 0) {
2306
            break;
2307
        }
2308
    }
2309
    if (idx == start) {
2310
        /* Get the longest length of unallocated sectors */
2311
        ret = 0;
2312
        for (idx = start + 1; idx < end; idx++) {
2313
            if (inode->data_vdi_id[idx] != 0) {
2314
                break;
2315
            }
2316
        }
2317
    }
2318

    
2319
    *pnum = (idx - start) * SD_DATA_OBJ_SIZE / BDRV_SECTOR_SIZE;
2320
    if (*pnum > nb_sectors) {
2321
        *pnum = nb_sectors;
2322
    }
2323
    return ret;
2324
}
2325

    
2326
static QEMUOptionParameter sd_create_options[] = {
2327
    {
2328
        .name = BLOCK_OPT_SIZE,
2329
        .type = OPT_SIZE,
2330
        .help = "Virtual disk size"
2331
    },
2332
    {
2333
        .name = BLOCK_OPT_BACKING_FILE,
2334
        .type = OPT_STRING,
2335
        .help = "File name of a base image"
2336
    },
2337
    {
2338
        .name = BLOCK_OPT_PREALLOC,
2339
        .type = OPT_STRING,
2340
        .help = "Preallocation mode (allowed values: off, full)"
2341
    },
2342
    { NULL }
2343
};
2344

    
2345
static BlockDriver bdrv_sheepdog = {
2346
    .format_name    = "sheepdog",
2347
    .protocol_name  = "sheepdog",
2348
    .instance_size  = sizeof(BDRVSheepdogState),
2349
    .bdrv_file_open = sd_open,
2350
    .bdrv_close     = sd_close,
2351
    .bdrv_create    = sd_create,
2352
    .bdrv_getlength = sd_getlength,
2353
    .bdrv_truncate  = sd_truncate,
2354

    
2355
    .bdrv_co_readv  = sd_co_readv,
2356
    .bdrv_co_writev = sd_co_writev,
2357
    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2358
    .bdrv_co_discard = sd_co_discard,
2359
    .bdrv_co_is_allocated = sd_co_is_allocated,
2360

    
2361
    .bdrv_snapshot_create   = sd_snapshot_create,
2362
    .bdrv_snapshot_goto     = sd_snapshot_goto,
2363
    .bdrv_snapshot_delete   = sd_snapshot_delete,
2364
    .bdrv_snapshot_list     = sd_snapshot_list,
2365

    
2366
    .bdrv_save_vmstate  = sd_save_vmstate,
2367
    .bdrv_load_vmstate  = sd_load_vmstate,
2368

    
2369
    .create_options = sd_create_options,
2370
};
2371

    
2372
static BlockDriver bdrv_sheepdog_tcp = {
2373
    .format_name    = "sheepdog",
2374
    .protocol_name  = "sheepdog+tcp",
2375
    .instance_size  = sizeof(BDRVSheepdogState),
2376
    .bdrv_file_open = sd_open,
2377
    .bdrv_close     = sd_close,
2378
    .bdrv_create    = sd_create,
2379
    .bdrv_getlength = sd_getlength,
2380
    .bdrv_truncate  = sd_truncate,
2381

    
2382
    .bdrv_co_readv  = sd_co_readv,
2383
    .bdrv_co_writev = sd_co_writev,
2384
    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2385
    .bdrv_co_discard = sd_co_discard,
2386
    .bdrv_co_is_allocated = sd_co_is_allocated,
2387

    
2388
    .bdrv_snapshot_create   = sd_snapshot_create,
2389
    .bdrv_snapshot_goto     = sd_snapshot_goto,
2390
    .bdrv_snapshot_delete   = sd_snapshot_delete,
2391
    .bdrv_snapshot_list     = sd_snapshot_list,
2392

    
2393
    .bdrv_save_vmstate  = sd_save_vmstate,
2394
    .bdrv_load_vmstate  = sd_load_vmstate,
2395

    
2396
    .create_options = sd_create_options,
2397
};
2398

    
2399
static BlockDriver bdrv_sheepdog_unix = {
2400
    .format_name    = "sheepdog",
2401
    .protocol_name  = "sheepdog+unix",
2402
    .instance_size  = sizeof(BDRVSheepdogState),
2403
    .bdrv_file_open = sd_open,
2404
    .bdrv_close     = sd_close,
2405
    .bdrv_create    = sd_create,
2406
    .bdrv_getlength = sd_getlength,
2407
    .bdrv_truncate  = sd_truncate,
2408

    
2409
    .bdrv_co_readv  = sd_co_readv,
2410
    .bdrv_co_writev = sd_co_writev,
2411
    .bdrv_co_flush_to_disk  = sd_co_flush_to_disk,
2412
    .bdrv_co_discard = sd_co_discard,
2413
    .bdrv_co_is_allocated = sd_co_is_allocated,
2414

    
2415
    .bdrv_snapshot_create   = sd_snapshot_create,
2416
    .bdrv_snapshot_goto     = sd_snapshot_goto,
2417
    .bdrv_snapshot_delete   = sd_snapshot_delete,
2418
    .bdrv_snapshot_list     = sd_snapshot_list,
2419

    
2420
    .bdrv_save_vmstate  = sd_save_vmstate,
2421
    .bdrv_load_vmstate  = sd_load_vmstate,
2422

    
2423
    .create_options = sd_create_options,
2424
};
2425

    
2426
static void bdrv_sheepdog_init(void)
2427
{
2428
    bdrv_register(&bdrv_sheepdog);
2429
    bdrv_register(&bdrv_sheepdog_tcp);
2430
    bdrv_register(&bdrv_sheepdog_unix);
2431
}
2432
block_init(bdrv_sheepdog_init);