Statistics
| Branch: | Revision:

root / block / rbd.c @ 0e326109

History | View | Annotate | Download (26 kB)

1
/*
2
 * QEMU Block driver for RADOS (Ceph)
3
 *
4
 * Copyright (C) 2010 Christian Brunner <chb@muc.de>
5
 *
6
 * This work is licensed under the terms of the GNU GPL, version 2.  See
7
 * the COPYING file in the top-level directory.
8
 *
9
 */
10

    
11
#include "qemu-common.h"
12
#include "qemu-error.h"
13

    
14
#include "rbd_types.h"
15
#include "block_int.h"
16

    
17
#include <rados/librados.h>
18

    
19

    
20

    
21
/*
22
 * When specifying the image filename use:
23
 *
24
 * rbd:poolname/devicename
25
 *
26
 * poolname must be the name of an existing rados pool
27
 *
28
 * devicename is the basename for all objects used to
29
 * emulate the raw device.
30
 *
31
 * Metadata information (image size, ...) is stored in an
32
 * object with the name "devicename.rbd".
33
 *
34
 * The raw device is split into 4MB sized objects by default.
35
 * The sequencenumber is encoded in a 12 byte long hex-string,
36
 * and is attached to the devicename, separated by a dot.
37
 * e.g. "devicename.1234567890ab"
38
 *
39
 */
40

    
41
#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
42

    
43
typedef struct RBDAIOCB {
44
    BlockDriverAIOCB common;
45
    QEMUBH *bh;
46
    int ret;
47
    QEMUIOVector *qiov;
48
    char *bounce;
49
    int write;
50
    int64_t sector_num;
51
    int aiocnt;
52
    int error;
53
    struct BDRVRBDState *s;
54
    int cancelled;
55
} RBDAIOCB;
56

    
57
typedef struct RADOSCB {
58
    int rcbid;
59
    RBDAIOCB *acb;
60
    struct BDRVRBDState *s;
61
    int done;
62
    int64_t segsize;
63
    char *buf;
64
    int ret;
65
} RADOSCB;
66

    
67
#define RBD_FD_READ 0
68
#define RBD_FD_WRITE 1
69

    
70
typedef struct BDRVRBDState {
71
    int fds[2];
72
    rados_pool_t pool;
73
    rados_pool_t header_pool;
74
    char name[RBD_MAX_OBJ_NAME_SIZE];
75
    char block_name[RBD_MAX_BLOCK_NAME_SIZE];
76
    uint64_t size;
77
    uint64_t objsize;
78
    int qemu_aio_count;
79
    int event_reader_pos;
80
    RADOSCB *event_rcb;
81
} BDRVRBDState;
82

    
83
typedef struct rbd_obj_header_ondisk RbdHeader1;
84

    
85
static void rbd_aio_bh_cb(void *opaque);
86

    
87
static int rbd_next_tok(char *dst, int dst_len,
88
                        char *src, char delim,
89
                        const char *name,
90
                        char **p)
91
{
92
    int l;
93
    char *end;
94

    
95
    *p = NULL;
96

    
97
    if (delim != '\0') {
98
        end = strchr(src, delim);
99
        if (end) {
100
            *p = end + 1;
101
            *end = '\0';
102
        }
103
    }
104
    l = strlen(src);
105
    if (l >= dst_len) {
106
        error_report("%s too long", name);
107
        return -EINVAL;
108
    } else if (l == 0) {
109
        error_report("%s too short", name);
110
        return -EINVAL;
111
    }
112

    
113
    pstrcpy(dst, dst_len, src);
114

    
115
    return 0;
116
}
117

    
118
static int rbd_parsename(const char *filename,
119
                         char *pool, int pool_len,
120
                         char *snap, int snap_len,
121
                         char *name, int name_len)
122
{
123
    const char *start;
124
    char *p, *buf;
125
    int ret;
126

    
127
    if (!strstart(filename, "rbd:", &start)) {
128
        return -EINVAL;
129
    }
130

    
131
    buf = qemu_strdup(start);
132
    p = buf;
133

    
134
    ret = rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
135
    if (ret < 0 || !p) {
136
        ret = -EINVAL;
137
        goto done;
138
    }
139
    ret = rbd_next_tok(name, name_len, p, '@', "object name", &p);
140
    if (ret < 0) {
141
        goto done;
142
    }
143
    if (!p) {
144
        *snap = '\0';
145
        goto done;
146
    }
147

    
148
    ret = rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p);
149

    
150
done:
151
    qemu_free(buf);
152
    return ret;
153
}
154

    
155
static int create_tmap_op(uint8_t op, const char *name, char **tmap_desc)
156
{
157
    uint32_t len = strlen(name);
158
    uint32_t len_le = cpu_to_le32(len);
159
    /* total_len = encoding op + name + empty buffer */
160
    uint32_t total_len = 1 + (sizeof(uint32_t) + len) + sizeof(uint32_t);
161
    uint8_t *desc = NULL;
162

    
163
    desc = qemu_malloc(total_len);
164

    
165
    *tmap_desc = (char *)desc;
166

    
167
    *desc = op;
168
    desc++;
169
    memcpy(desc, &len_le, sizeof(len_le));
170
    desc += sizeof(len_le);
171
    memcpy(desc, name, len);
172
    desc += len;
173
    len = 0; /* no need for endian conversion for 0 */
174
    memcpy(desc, &len, sizeof(len));
175
    desc += sizeof(len);
176

    
177
    return (char *)desc - *tmap_desc;
178
}
179

    
180
static void free_tmap_op(char *tmap_desc)
181
{
182
    qemu_free(tmap_desc);
183
}
184

    
185
static int rbd_register_image(rados_pool_t pool, const char *name)
186
{
187
    char *tmap_desc;
188
    const char *dir = RBD_DIRECTORY;
189
    int ret;
190

    
191
    ret = create_tmap_op(CEPH_OSD_TMAP_SET, name, &tmap_desc);
192
    if (ret < 0) {
193
        return ret;
194
    }
195

    
196
    ret = rados_tmap_update(pool, dir, tmap_desc, ret);
197
    free_tmap_op(tmap_desc);
198

    
199
    return ret;
200
}
201

    
202
static int touch_rbd_info(rados_pool_t pool, const char *info_oid)
203
{
204
    int r = rados_write(pool, info_oid, 0, NULL, 0);
205
    if (r < 0) {
206
        return r;
207
    }
208
    return 0;
209
}
210

    
211
static int rbd_assign_bid(rados_pool_t pool, uint64_t *id)
212
{
213
    uint64_t out[1];
214
    const char *info_oid = RBD_INFO;
215

    
216
    *id = 0;
217

    
218
    int r = touch_rbd_info(pool, info_oid);
219
    if (r < 0) {
220
        return r;
221
    }
222

    
223
    r = rados_exec(pool, info_oid, "rbd", "assign_bid", NULL,
224
                   0, (char *)out, sizeof(out));
225
    if (r < 0) {
226
        return r;
227
    }
228

    
229
    le64_to_cpus(out);
230
    *id = out[0];
231

    
232
    return 0;
233
}
234

    
235
static int rbd_create(const char *filename, QEMUOptionParameter *options)
236
{
237
    int64_t bytes = 0;
238
    int64_t objsize;
239
    uint64_t size;
240
    time_t mtime;
241
    uint8_t obj_order = RBD_DEFAULT_OBJ_ORDER;
242
    char pool[RBD_MAX_SEG_NAME_SIZE];
243
    char n[RBD_MAX_SEG_NAME_SIZE];
244
    char name[RBD_MAX_OBJ_NAME_SIZE];
245
    char snap_buf[RBD_MAX_SEG_NAME_SIZE];
246
    char *snap = NULL;
247
    RbdHeader1 header;
248
    rados_pool_t p;
249
    uint64_t bid;
250
    uint32_t hi, lo;
251
    int ret;
252

    
253
    if (rbd_parsename(filename,
254
                      pool, sizeof(pool),
255
                      snap_buf, sizeof(snap_buf),
256
                      name, sizeof(name)) < 0) {
257
        return -EINVAL;
258
    }
259
    if (snap_buf[0] != '\0') {
260
        snap = snap_buf;
261
    }
262

    
263
    snprintf(n, sizeof(n), "%s%s", name, RBD_SUFFIX);
264

    
265
    /* Read out options */
266
    while (options && options->name) {
267
        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
268
            bytes = options->value.n;
269
        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
270
            if (options->value.n) {
271
                objsize = options->value.n;
272
                if ((objsize - 1) & objsize) {    /* not a power of 2? */
273
                    error_report("obj size needs to be power of 2");
274
                    return -EINVAL;
275
                }
276
                if (objsize < 4096) {
277
                    error_report("obj size too small");
278
                    return -EINVAL;
279
                }
280
                obj_order = ffs(objsize) - 1;
281
            }
282
        }
283
        options++;
284
    }
285

    
286
    memset(&header, 0, sizeof(header));
287
    pstrcpy(header.text, sizeof(header.text), RBD_HEADER_TEXT);
288
    pstrcpy(header.signature, sizeof(header.signature), RBD_HEADER_SIGNATURE);
289
    pstrcpy(header.version, sizeof(header.version), RBD_HEADER_VERSION);
290
    header.image_size = cpu_to_le64(bytes);
291
    header.options.order = obj_order;
292
    header.options.crypt_type = RBD_CRYPT_NONE;
293
    header.options.comp_type = RBD_COMP_NONE;
294
    header.snap_seq = 0;
295
    header.snap_count = 0;
296

    
297
    if (rados_initialize(0, NULL) < 0) {
298
        error_report("error initializing");
299
        return -EIO;
300
    }
301

    
302
    if (rados_open_pool(pool, &p)) {
303
        error_report("error opening pool %s", pool);
304
        rados_deinitialize();
305
        return -EIO;
306
    }
307

    
308
    /* check for existing rbd header file */
309
    ret = rados_stat(p, n, &size, &mtime);
310
    if (ret == 0) {
311
        ret=-EEXIST;
312
        goto done;
313
    }
314

    
315
    ret = rbd_assign_bid(p, &bid);
316
    if (ret < 0) {
317
        error_report("failed assigning block id");
318
        rados_deinitialize();
319
        return -EIO;
320
    }
321
    hi = bid >> 32;
322
    lo = bid & 0xFFFFFFFF;
323
    snprintf(header.block_name, sizeof(header.block_name), "rb.%x.%x", hi, lo);
324

    
325
    /* create header file */
326
    ret = rados_write(p, n, 0, (const char *)&header, sizeof(header));
327
    if (ret < 0) {
328
        goto done;
329
    }
330

    
331
    ret = rbd_register_image(p, name);
332
done:
333
    rados_close_pool(p);
334
    rados_deinitialize();
335

    
336
    return ret;
337
}
338

    
339
/*
340
 * This aio completion is being called from rbd_aio_event_reader() and
341
 * runs in qemu context. It schedules a bh, but just in case the aio
342
 * was not cancelled before.
343
 */
344
static void rbd_complete_aio(RADOSCB *rcb)
345
{
346
    RBDAIOCB *acb = rcb->acb;
347
    int64_t r;
348

    
349
    acb->aiocnt--;
350

    
351
    if (acb->cancelled) {
352
        if (!acb->aiocnt) {
353
            qemu_vfree(acb->bounce);
354
            qemu_aio_release(acb);
355
        }
356
        goto done;
357
    }
358

    
359
    r = rcb->ret;
360

    
361
    if (acb->write) {
362
        if (r < 0) {
363
            acb->ret = r;
364
            acb->error = 1;
365
        } else if (!acb->error) {
366
            acb->ret += rcb->segsize;
367
        }
368
    } else {
369
        if (r == -ENOENT) {
370
            memset(rcb->buf, 0, rcb->segsize);
371
            if (!acb->error) {
372
                acb->ret += rcb->segsize;
373
            }
374
        } else if (r < 0) {
375
            memset(rcb->buf, 0, rcb->segsize);
376
            acb->ret = r;
377
            acb->error = 1;
378
        } else if (r < rcb->segsize) {
379
            memset(rcb->buf + r, 0, rcb->segsize - r);
380
            if (!acb->error) {
381
                acb->ret += rcb->segsize;
382
            }
383
        } else if (!acb->error) {
384
            acb->ret += r;
385
        }
386
    }
387
    /* Note that acb->bh can be NULL in case where the aio was cancelled */
388
    if (!acb->aiocnt) {
389
        acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
390
        qemu_bh_schedule(acb->bh);
391
    }
392
done:
393
    qemu_free(rcb);
394
}
395

    
396
/*
397
 * aio fd read handler. It runs in the qemu context and calls the
398
 * completion handling of completed rados aio operations.
399
 */
400
static void rbd_aio_event_reader(void *opaque)
401
{
402
    BDRVRBDState *s = opaque;
403

    
404
    ssize_t ret;
405

    
406
    do {
407
        char *p = (char *)&s->event_rcb;
408

    
409
        /* now read the rcb pointer that was sent from a non qemu thread */
410
        if ((ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
411
                        sizeof(s->event_rcb) - s->event_reader_pos)) > 0) {
412
            if (ret > 0) {
413
                s->event_reader_pos += ret;
414
                if (s->event_reader_pos == sizeof(s->event_rcb)) {
415
                    s->event_reader_pos = 0;
416
                    rbd_complete_aio(s->event_rcb);
417
                    s->qemu_aio_count --;
418
                }
419
            }
420
        }
421
    } while (ret < 0 && errno == EINTR);
422
}
423

    
424
static int rbd_aio_flush_cb(void *opaque)
425
{
426
    BDRVRBDState *s = opaque;
427

    
428
    return (s->qemu_aio_count > 0);
429
}
430

    
431

    
432
static int rbd_set_snapc(rados_pool_t pool, const char *snap, RbdHeader1 *header)
433
{
434
    uint32_t snap_count = le32_to_cpu(header->snap_count);
435
    rados_snap_t *snaps = NULL;
436
    rados_snap_t seq;
437
    uint32_t i;
438
    uint64_t snap_names_len = le64_to_cpu(header->snap_names_len);
439
    int r;
440
    rados_snap_t snapid = 0;
441

    
442
    if (snap_count) {
443
        const char *header_snap = (const char *)&header->snaps[snap_count];
444
        const char *end = header_snap + snap_names_len;
445
        snaps = qemu_malloc(sizeof(rados_snap_t) * header->snap_count);
446

    
447
        for (i=0; i < snap_count; i++) {
448
            snaps[i] = le64_to_cpu(header->snaps[i].id);
449

    
450
            if (snap && strcmp(snap, header_snap) == 0) {
451
                snapid = snaps[i];
452
            }
453

    
454
            header_snap += strlen(header_snap) + 1;
455
            if (header_snap > end) {
456
                error_report("bad header, snapshot list broken");
457
            }
458
        }
459
    }
460

    
461
    if (snap && !snapid) {
462
        error_report("snapshot not found");
463
        qemu_free(snaps);
464
        return -ENOENT;
465
    }
466
    seq = le32_to_cpu(header->snap_seq);
467

    
468
    r = rados_set_snap_context(pool, seq, snaps, snap_count);
469

    
470
    rados_set_snap(pool, snapid);
471

    
472
    qemu_free(snaps);
473

    
474
    return r;
475
}
476

    
477
#define BUF_READ_START_LEN    4096
478

    
479
static int rbd_read_header(BDRVRBDState *s, char **hbuf)
480
{
481
    char *buf = NULL;
482
    char n[RBD_MAX_SEG_NAME_SIZE];
483
    uint64_t len = BUF_READ_START_LEN;
484
    int r;
485

    
486
    snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
487

    
488
    buf = qemu_malloc(len);
489

    
490
    r = rados_read(s->header_pool, n, 0, buf, len);
491
    if (r < 0) {
492
        goto failed;
493
    }
494

    
495
    if (r < len) {
496
        goto done;
497
    }
498

    
499
    qemu_free(buf);
500
    buf = qemu_malloc(len);
501

    
502
    r = rados_stat(s->header_pool, n, &len, NULL);
503
    if (r < 0) {
504
        goto failed;
505
    }
506

    
507
    r = rados_read(s->header_pool, n, 0, buf, len);
508
    if (r < 0) {
509
        goto failed;
510
    }
511

    
512
done:
513
    *hbuf = buf;
514
    return 0;
515

    
516
failed:
517
    qemu_free(buf);
518
    return r;
519
}
520

    
521
static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
522
{
523
    BDRVRBDState *s = bs->opaque;
524
    RbdHeader1 *header;
525
    char pool[RBD_MAX_SEG_NAME_SIZE];
526
    char snap_buf[RBD_MAX_SEG_NAME_SIZE];
527
    char *snap = NULL;
528
    char *hbuf = NULL;
529
    int r;
530

    
531
    if (rbd_parsename(filename, pool, sizeof(pool),
532
                      snap_buf, sizeof(snap_buf),
533
                      s->name, sizeof(s->name)) < 0) {
534
        return -EINVAL;
535
    }
536
    if (snap_buf[0] != '\0') {
537
        snap = snap_buf;
538
    }
539

    
540
    if ((r = rados_initialize(0, NULL)) < 0) {
541
        error_report("error initializing");
542
        return r;
543
    }
544

    
545
    if ((r = rados_open_pool(pool, &s->pool))) {
546
        error_report("error opening pool %s", pool);
547
        rados_deinitialize();
548
        return r;
549
    }
550

    
551
    if ((r = rados_open_pool(pool, &s->header_pool))) {
552
        error_report("error opening pool %s", pool);
553
        rados_deinitialize();
554
        return r;
555
    }
556

    
557
    if ((r = rbd_read_header(s, &hbuf)) < 0) {
558
        error_report("error reading header from %s", s->name);
559
        goto failed;
560
    }
561

    
562
    if (memcmp(hbuf + 64, RBD_HEADER_SIGNATURE, 4)) {
563
        error_report("Invalid header signature");
564
        r = -EMEDIUMTYPE;
565
        goto failed;
566
    }
567

    
568
    if (memcmp(hbuf + 68, RBD_HEADER_VERSION, 8)) {
569
        error_report("Unknown image version");
570
        r = -EMEDIUMTYPE;
571
        goto failed;
572
    }
573

    
574
    header = (RbdHeader1 *) hbuf;
575
    s->size = le64_to_cpu(header->image_size);
576
    s->objsize = 1ULL << header->options.order;
577
    memcpy(s->block_name, header->block_name, sizeof(header->block_name));
578

    
579
    r = rbd_set_snapc(s->pool, snap, header);
580
    if (r < 0) {
581
        error_report("failed setting snap context: %s", strerror(-r));
582
        goto failed;
583
    }
584

    
585
    bs->read_only = (snap != NULL);
586

    
587
    s->event_reader_pos = 0;
588
    r = qemu_pipe(s->fds);
589
    if (r < 0) {
590
        error_report("error opening eventfd");
591
        goto failed;
592
    }
593
    fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
594
    fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
595
    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], rbd_aio_event_reader, NULL,
596
        rbd_aio_flush_cb, NULL, s);
597

    
598
    qemu_free(hbuf);
599

    
600
    return 0;
601

    
602
failed:
603
    qemu_free(hbuf);
604

    
605
    rados_close_pool(s->header_pool);
606
    rados_close_pool(s->pool);
607
    rados_deinitialize();
608
    return r;
609
}
610

    
611
static void rbd_close(BlockDriverState *bs)
612
{
613
    BDRVRBDState *s = bs->opaque;
614

    
615
    close(s->fds[0]);
616
    close(s->fds[1]);
617
    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL,
618
        NULL);
619

    
620
    rados_close_pool(s->header_pool);
621
    rados_close_pool(s->pool);
622
    rados_deinitialize();
623
}
624

    
625
/*
626
 * Cancel aio. Since we don't reference acb in a non qemu threads,
627
 * it is safe to access it here.
628
 */
629
static void rbd_aio_cancel(BlockDriverAIOCB *blockacb)
630
{
631
    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
632
    acb->cancelled = 1;
633
}
634

    
635
static AIOPool rbd_aio_pool = {
636
    .aiocb_size = sizeof(RBDAIOCB),
637
    .cancel = rbd_aio_cancel,
638
};
639

    
640
/*
641
 * This is the callback function for rados_aio_read and _write
642
 *
643
 * Note: this function is being called from a non qemu thread so
644
 * we need to be careful about what we do here. Generally we only
645
 * write to the block notification pipe, and do the rest of the
646
 * io completion handling from rbd_aio_event_reader() which
647
 * runs in a qemu context.
648
 */
649
static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
650
{
651
    int ret;
652
    rcb->ret = rados_aio_get_return_value(c);
653
    rados_aio_release(c);
654
    while (1) {
655
        fd_set wfd;
656
        int fd = rcb->s->fds[RBD_FD_WRITE];
657

    
658
        /* send the rcb pointer to the qemu thread that is responsible
659
           for the aio completion. Must do it in a qemu thread context */
660
        ret = write(fd, (void *)&rcb, sizeof(rcb));
661
        if (ret >= 0) {
662
            break;
663
        }
664
        if (errno == EINTR) {
665
            continue;
666
        }
667
        if (errno != EAGAIN) {
668
            break;
669
        }
670

    
671
        FD_ZERO(&wfd);
672
        FD_SET(fd, &wfd);
673
        do {
674
            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
675
        } while (ret < 0 && errno == EINTR);
676
    }
677

    
678
    if (ret < 0) {
679
        error_report("failed writing to acb->s->fds\n");
680
        qemu_free(rcb);
681
    }
682
}
683

    
684
/* Callback when all queued rados_aio requests are complete */
685

    
686
static void rbd_aio_bh_cb(void *opaque)
687
{
688
    RBDAIOCB *acb = opaque;
689

    
690
    if (!acb->write) {
691
        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
692
    }
693
    qemu_vfree(acb->bounce);
694
    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
695
    qemu_bh_delete(acb->bh);
696
    acb->bh = NULL;
697

    
698
    qemu_aio_release(acb);
699
}
700

    
701
static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
702
                                           int64_t sector_num,
703
                                           QEMUIOVector *qiov,
704
                                           int nb_sectors,
705
                                           BlockDriverCompletionFunc *cb,
706
                                           void *opaque, int write)
707
{
708
    RBDAIOCB *acb;
709
    RADOSCB *rcb;
710
    rados_completion_t c;
711
    char n[RBD_MAX_SEG_NAME_SIZE];
712
    int64_t segnr, segoffs, segsize, last_segnr;
713
    int64_t off, size;
714
    char *buf;
715

    
716
    BDRVRBDState *s = bs->opaque;
717

    
718
    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
719
    acb->write = write;
720
    acb->qiov = qiov;
721
    acb->bounce = qemu_blockalign(bs, qiov->size);
722
    acb->aiocnt = 0;
723
    acb->ret = 0;
724
    acb->error = 0;
725
    acb->s = s;
726
    acb->cancelled = 0;
727
    acb->bh = NULL;
728

    
729
    if (write) {
730
        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
731
    }
732

    
733
    buf = acb->bounce;
734

    
735
    off = sector_num * BDRV_SECTOR_SIZE;
736
    size = nb_sectors * BDRV_SECTOR_SIZE;
737
    segnr = off / s->objsize;
738
    segoffs = off % s->objsize;
739
    segsize = s->objsize - segoffs;
740

    
741
    last_segnr = ((off + size - 1) / s->objsize);
742
    acb->aiocnt = (last_segnr - segnr) + 1;
743

    
744
    s->qemu_aio_count += acb->aiocnt; /* All the RADOSCB */
745

    
746
    while (size > 0) {
747
        if (size < segsize) {
748
            segsize = size;
749
        }
750

    
751
        snprintf(n, sizeof(n), "%s.%012" PRIx64, s->block_name,
752
                 segnr);
753

    
754
        rcb = qemu_malloc(sizeof(RADOSCB));
755
        rcb->done = 0;
756
        rcb->acb = acb;
757
        rcb->segsize = segsize;
758
        rcb->buf = buf;
759
        rcb->s = acb->s;
760

    
761
        if (write) {
762
            rados_aio_create_completion(rcb, NULL,
763
                                        (rados_callback_t) rbd_finish_aiocb,
764
                                        &c);
765
            rados_aio_write(s->pool, n, segoffs, buf, segsize, c);
766
        } else {
767
            rados_aio_create_completion(rcb,
768
                                        (rados_callback_t) rbd_finish_aiocb,
769
                                        NULL, &c);
770
            rados_aio_read(s->pool, n, segoffs, buf, segsize, c);
771
        }
772

    
773
        buf += segsize;
774
        size -= segsize;
775
        segoffs = 0;
776
        segsize = s->objsize;
777
        segnr++;
778
    }
779

    
780
    return &acb->common;
781
}
782

    
783
static BlockDriverAIOCB *rbd_aio_readv(BlockDriverState * bs,
784
                                       int64_t sector_num, QEMUIOVector * qiov,
785
                                       int nb_sectors,
786
                                       BlockDriverCompletionFunc * cb,
787
                                       void *opaque)
788
{
789
    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
790
}
791

    
792
static BlockDriverAIOCB *rbd_aio_writev(BlockDriverState * bs,
793
                                        int64_t sector_num, QEMUIOVector * qiov,
794
                                        int nb_sectors,
795
                                        BlockDriverCompletionFunc * cb,
796
                                        void *opaque)
797
{
798
    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
799
}
800

    
801
static int rbd_getinfo(BlockDriverState * bs, BlockDriverInfo * bdi)
802
{
803
    BDRVRBDState *s = bs->opaque;
804
    bdi->cluster_size = s->objsize;
805
    return 0;
806
}
807

    
808
static int64_t rbd_getlength(BlockDriverState * bs)
809
{
810
    BDRVRBDState *s = bs->opaque;
811

    
812
    return s->size;
813
}
814

    
815
static int rbd_snap_create(BlockDriverState *bs, QEMUSnapshotInfo *sn_info)
816
{
817
    BDRVRBDState *s = bs->opaque;
818
    char inbuf[512], outbuf[128];
819
    uint64_t snap_id;
820
    int r;
821
    char *p = inbuf;
822
    char *end = inbuf + sizeof(inbuf);
823
    char n[RBD_MAX_SEG_NAME_SIZE];
824
    char *hbuf = NULL;
825
    RbdHeader1 *header;
826

    
827
    if (sn_info->name[0] == '\0') {
828
        return -EINVAL; /* we need a name for rbd snapshots */
829
    }
830

    
831
    /*
832
     * rbd snapshots are using the name as the user controlled unique identifier
833
     * we can't use the rbd snapid for that purpose, as it can't be set
834
     */
835
    if (sn_info->id_str[0] != '\0' &&
836
        strcmp(sn_info->id_str, sn_info->name) != 0) {
837
        return -EINVAL;
838
    }
839

    
840
    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
841
        return -ERANGE;
842
    }
843

    
844
    r = rados_selfmanaged_snap_create(s->header_pool, &snap_id);
845
    if (r < 0) {
846
        error_report("failed to create snap id: %s", strerror(-r));
847
        return r;
848
    }
849

    
850
    *(uint32_t *)p = strlen(sn_info->name);
851
    cpu_to_le32s((uint32_t *)p);
852
    p += sizeof(uint32_t);
853
    strncpy(p, sn_info->name, end - p);
854
    p += strlen(p);
855
    if (p + sizeof(snap_id) > end) {
856
        error_report("invalid input parameter");
857
        return -EINVAL;
858
    }
859

    
860
    *(uint64_t *)p = snap_id;
861
    cpu_to_le64s((uint64_t *)p);
862

    
863
    snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
864

    
865
    r = rados_exec(s->header_pool, n, "rbd", "snap_add", inbuf,
866
                   sizeof(inbuf), outbuf, sizeof(outbuf));
867
    if (r < 0) {
868
        error_report("rbd.snap_add execution failed failed: %s", strerror(-r));
869
        return r;
870
    }
871

    
872
    sprintf(sn_info->id_str, "%s", sn_info->name);
873

    
874
    r = rbd_read_header(s, &hbuf);
875
    if (r < 0) {
876
        error_report("failed reading header: %s", strerror(-r));
877
        return r;
878
    }
879

    
880
    header = (RbdHeader1 *) hbuf;
881
    r = rbd_set_snapc(s->pool, sn_info->name, header);
882
    if (r < 0) {
883
        error_report("failed setting snap context: %s", strerror(-r));
884
        goto failed;
885
    }
886

    
887
    return 0;
888

    
889
failed:
890
    qemu_free(header);
891
    return r;
892
}
893

    
894
static int decode32(char **p, const char *end, uint32_t *v)
895
{
896
    if (*p + 4 > end) {
897
        return -ERANGE;
898
    }
899

    
900
    *v = *(uint32_t *)(*p);
901
    le32_to_cpus(v);
902
    *p += 4;
903
    return 0;
904
}
905

    
906
static int decode64(char **p, const char *end, uint64_t *v)
907
{
908
    if (*p + 8 > end) {
909
        return -ERANGE;
910
    }
911

    
912
    *v = *(uint64_t *)(*p);
913
    le64_to_cpus(v);
914
    *p += 8;
915
    return 0;
916
}
917

    
918
static int decode_str(char **p, const char *end, char **s)
919
{
920
    uint32_t len;
921
    int r;
922

    
923
    if ((r = decode32(p, end, &len)) < 0) {
924
        return r;
925
    }
926

    
927
    *s = qemu_malloc(len + 1);
928
    memcpy(*s, *p, len);
929
    *p += len;
930
    (*s)[len] = '\0';
931

    
932
    return len;
933
}
934

    
935
static int rbd_snap_list(BlockDriverState *bs, QEMUSnapshotInfo **psn_tab)
936
{
937
    BDRVRBDState *s = bs->opaque;
938
    char n[RBD_MAX_SEG_NAME_SIZE];
939
    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
940
    RbdHeader1 *header;
941
    char *hbuf = NULL;
942
    char *outbuf = NULL, *end, *buf;
943
    uint64_t len;
944
    uint64_t snap_seq;
945
    uint32_t snap_count;
946
    int r, i;
947

    
948
    /* read header to estimate how much space we need to read the snap
949
     * list */
950
    if ((r = rbd_read_header(s, &hbuf)) < 0) {
951
        goto done_err;
952
    }
953
    header = (RbdHeader1 *)hbuf;
954
    len = le64_to_cpu(header->snap_names_len);
955
    len += 1024; /* should have already been enough, but new snapshots might
956
                    already been created since we read the header. just allocate
957
                    a bit more, so that in most cases it'll suffice anyway */
958
    qemu_free(hbuf);
959

    
960
    snprintf(n, sizeof(n), "%s%s", s->name, RBD_SUFFIX);
961
    while (1) {
962
        qemu_free(outbuf);
963
        outbuf = qemu_malloc(len);
964

    
965
        r = rados_exec(s->header_pool, n, "rbd", "snap_list", NULL, 0,
966
                       outbuf, len);
967
        if (r < 0) {
968
            error_report("rbd.snap_list execution failed failed: %s", strerror(-r));
969
            goto done_err;
970
        }
971
        if (r != len) {
972
            break;
973
        }
974

    
975
        /* if we're here, we probably raced with some snaps creation */
976
        len *= 2;
977
    }
978
    buf = outbuf;
979
    end = buf + len;
980

    
981
    if ((r = decode64(&buf, end, &snap_seq)) < 0) {
982
        goto done_err;
983
    }
984
    if ((r = decode32(&buf, end, &snap_count)) < 0) {
985
        goto done_err;
986
    }
987

    
988
    sn_tab = qemu_mallocz(snap_count * sizeof(QEMUSnapshotInfo));
989
    for (i = 0; i < snap_count; i++) {
990
        uint64_t id, image_size;
991
        char *snap_name;
992

    
993
        if ((r = decode64(&buf, end, &id)) < 0) {
994
            goto done_err;
995
        }
996
        if ((r = decode64(&buf, end, &image_size)) < 0) {
997
            goto done_err;
998
        }
999
        if ((r = decode_str(&buf, end, &snap_name)) < 0) {
1000
            goto done_err;
1001
        }
1002

    
1003
        sn_info = sn_tab + i;
1004
        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
1005
        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
1006
        qemu_free(snap_name);
1007

    
1008
        sn_info->vm_state_size = image_size;
1009
        sn_info->date_sec = 0;
1010
        sn_info->date_nsec = 0;
1011
        sn_info->vm_clock_nsec = 0;
1012
    }
1013
    *psn_tab = sn_tab;
1014
    qemu_free(outbuf);
1015
    return snap_count;
1016
done_err:
1017
    qemu_free(sn_tab);
1018
    qemu_free(outbuf);
1019
    return r;
1020
}
1021

    
1022
static QEMUOptionParameter rbd_create_options[] = {
1023
    {
1024
     .name = BLOCK_OPT_SIZE,
1025
     .type = OPT_SIZE,
1026
     .help = "Virtual disk size"
1027
    },
1028
    {
1029
     .name = BLOCK_OPT_CLUSTER_SIZE,
1030
     .type = OPT_SIZE,
1031
     .help = "RBD object size"
1032
    },
1033
    {NULL}
1034
};
1035

    
1036
static BlockDriver bdrv_rbd = {
1037
    .format_name        = "rbd",
1038
    .instance_size      = sizeof(BDRVRBDState),
1039
    .bdrv_file_open     = rbd_open,
1040
    .bdrv_close         = rbd_close,
1041
    .bdrv_create        = rbd_create,
1042
    .bdrv_get_info      = rbd_getinfo,
1043
    .create_options     = rbd_create_options,
1044
    .bdrv_getlength     = rbd_getlength,
1045
    .protocol_name      = "rbd",
1046

    
1047
    .bdrv_aio_readv     = rbd_aio_readv,
1048
    .bdrv_aio_writev    = rbd_aio_writev,
1049

    
1050
    .bdrv_snapshot_create = rbd_snap_create,
1051
    .bdrv_snapshot_list = rbd_snap_list,
1052
};
1053

    
1054
static void bdrv_rbd_init(void)
1055
{
1056
    bdrv_register(&bdrv_rbd);
1057
}
1058

    
1059
block_init(bdrv_rbd_init);