Statistics
| Branch: | Revision:

root / block / rbd.c @ 16a06b24

History | View | Annotate | Download (21.8 kB)

1
/*
2
 * QEMU Block driver for RADOS (Ceph)
3
 *
4
 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5
 *                         Josh Durgin <josh.durgin@dreamhost.com>
6
 *
7
 * This work is licensed under the terms of the GNU GPL, version 2.  See
8
 * the COPYING file in the top-level directory.
9
 *
10
 */
11

    
12
#include <inttypes.h>
13

    
14
#include "qemu-common.h"
15
#include "qemu-error.h"
16
#include "block_int.h"
17

    
18
#include <rbd/librbd.h>
19

    
20
/*
21
 * When specifying the image filename use:
22
 *
23
 * rbd:poolname/devicename[@snapshotname][:option1=value1[:option2=value2...]]
24
 *
25
 * poolname must be the name of an existing rados pool.
26
 *
27
 * devicename is the name of the rbd image.
28
 *
29
 * Each option given is used to configure rados, and may be any valid
30
 * Ceph option, "id", or "conf".
31
 *
32
 * The "id" option indicates what user we should authenticate as to
33
 * the Ceph cluster.  If it is excluded we will use the Ceph default
34
 * (normally 'admin').
35
 *
36
 * The "conf" option specifies a Ceph configuration file to read.  If
37
 * it is not specified, we will read from the default Ceph locations
38
 * (e.g., /etc/ceph/ceph.conf).  To avoid reading _any_ configuration
39
 * file, specify conf=/dev/null.
40
 *
41
 * Configuration values containing :, @, or = can be escaped with a
42
 * leading "\".
43
 */
44

    
45
#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
46

    
47
#define RBD_MAX_CONF_NAME_SIZE 128
48
#define RBD_MAX_CONF_VAL_SIZE 512
49
#define RBD_MAX_CONF_SIZE 1024
50
#define RBD_MAX_POOL_NAME_SIZE 128
51
#define RBD_MAX_SNAP_NAME_SIZE 128
52
#define RBD_MAX_SNAPS 100
53

    
54
typedef struct RBDAIOCB {
55
    BlockDriverAIOCB common;
56
    QEMUBH *bh;
57
    int ret;
58
    QEMUIOVector *qiov;
59
    char *bounce;
60
    int write;
61
    int64_t sector_num;
62
    int error;
63
    struct BDRVRBDState *s;
64
    int cancelled;
65
} RBDAIOCB;
66

    
67
typedef struct RADOSCB {
68
    int rcbid;
69
    RBDAIOCB *acb;
70
    struct BDRVRBDState *s;
71
    int done;
72
    int64_t size;
73
    char *buf;
74
    int ret;
75
} RADOSCB;
76

    
77
#define RBD_FD_READ 0
78
#define RBD_FD_WRITE 1
79

    
80
typedef struct BDRVRBDState {
81
    int fds[2];
82
    rados_t cluster;
83
    rados_ioctx_t io_ctx;
84
    rbd_image_t image;
85
    char name[RBD_MAX_IMAGE_NAME_SIZE];
86
    int qemu_aio_count;
87
    char *snap;
88
    int event_reader_pos;
89
    RADOSCB *event_rcb;
90
} BDRVRBDState;
91

    
92
static void rbd_aio_bh_cb(void *opaque);
93

    
94
static int qemu_rbd_next_tok(char *dst, int dst_len,
95
                             char *src, char delim,
96
                             const char *name,
97
                             char **p)
98
{
99
    int l;
100
    char *end;
101

    
102
    *p = NULL;
103

    
104
    if (delim != '\0') {
105
        for (end = src; *end; ++end) {
106
            if (*end == delim) {
107
                break;
108
            }
109
            if (*end == '\\' && end[1] != '\0') {
110
                end++;
111
            }
112
        }
113
        if (*end == delim) {
114
            *p = end + 1;
115
            *end = '\0';
116
        }
117
    }
118
    l = strlen(src);
119
    if (l >= dst_len) {
120
        error_report("%s too long", name);
121
        return -EINVAL;
122
    } else if (l == 0) {
123
        error_report("%s too short", name);
124
        return -EINVAL;
125
    }
126

    
127
    pstrcpy(dst, dst_len, src);
128

    
129
    return 0;
130
}
131

    
132
static void qemu_rbd_unescape(char *src)
133
{
134
    char *p;
135

    
136
    for (p = src; *src; ++src, ++p) {
137
        if (*src == '\\' && src[1] != '\0') {
138
            src++;
139
        }
140
        *p = *src;
141
    }
142
    *p = '\0';
143
}
144

    
145
static int qemu_rbd_parsename(const char *filename,
146
                              char *pool, int pool_len,
147
                              char *snap, int snap_len,
148
                              char *name, int name_len,
149
                              char *conf, int conf_len)
150
{
151
    const char *start;
152
    char *p, *buf;
153
    int ret;
154

    
155
    if (!strstart(filename, "rbd:", &start)) {
156
        return -EINVAL;
157
    }
158

    
159
    buf = g_strdup(start);
160
    p = buf;
161
    *snap = '\0';
162
    *conf = '\0';
163

    
164
    ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
165
    if (ret < 0 || !p) {
166
        ret = -EINVAL;
167
        goto done;
168
    }
169
    qemu_rbd_unescape(pool);
170

    
171
    if (strchr(p, '@')) {
172
        ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
173
        if (ret < 0) {
174
            goto done;
175
        }
176
        ret = qemu_rbd_next_tok(snap, snap_len, p, ':', "snap name", &p);
177
        qemu_rbd_unescape(snap);
178
    } else {
179
        ret = qemu_rbd_next_tok(name, name_len, p, ':', "object name", &p);
180
    }
181
    qemu_rbd_unescape(name);
182
    if (ret < 0 || !p) {
183
        goto done;
184
    }
185

    
186
    ret = qemu_rbd_next_tok(conf, conf_len, p, '\0', "configuration", &p);
187

    
188
done:
189
    g_free(buf);
190
    return ret;
191
}
192

    
193
static char *qemu_rbd_parse_clientname(const char *conf, char *clientname)
194
{
195
    const char *p = conf;
196

    
197
    while (*p) {
198
        int len;
199
        const char *end = strchr(p, ':');
200

    
201
        if (end) {
202
            len = end - p;
203
        } else {
204
            len = strlen(p);
205
        }
206

    
207
        if (strncmp(p, "id=", 3) == 0) {
208
            len -= 3;
209
            strncpy(clientname, p + 3, len);
210
            clientname[len] = '\0';
211
            return clientname;
212
        }
213
        if (end == NULL) {
214
            break;
215
        }
216
        p = end + 1;
217
    }
218
    return NULL;
219
}
220

    
221
static int qemu_rbd_set_conf(rados_t cluster, const char *conf)
222
{
223
    char *p, *buf;
224
    char name[RBD_MAX_CONF_NAME_SIZE];
225
    char value[RBD_MAX_CONF_VAL_SIZE];
226
    int ret = 0;
227

    
228
    buf = g_strdup(conf);
229
    p = buf;
230

    
231
    while (p) {
232
        ret = qemu_rbd_next_tok(name, sizeof(name), p,
233
                                '=', "conf option name", &p);
234
        if (ret < 0) {
235
            break;
236
        }
237
        qemu_rbd_unescape(name);
238

    
239
        if (!p) {
240
            error_report("conf option %s has no value", name);
241
            ret = -EINVAL;
242
            break;
243
        }
244

    
245
        ret = qemu_rbd_next_tok(value, sizeof(value), p,
246
                                ':', "conf option value", &p);
247
        if (ret < 0) {
248
            break;
249
        }
250
        qemu_rbd_unescape(value);
251

    
252
        if (strcmp(name, "conf") == 0) {
253
            ret = rados_conf_read_file(cluster, value);
254
            if (ret < 0) {
255
                error_report("error reading conf file %s", value);
256
                break;
257
            }
258
        } else if (strcmp(name, "id") == 0) {
259
            /* ignore, this is parsed by qemu_rbd_parse_clientname() */
260
        } else {
261
            ret = rados_conf_set(cluster, name, value);
262
            if (ret < 0) {
263
                error_report("invalid conf option %s", name);
264
                ret = -EINVAL;
265
                break;
266
            }
267
        }
268
    }
269

    
270
    g_free(buf);
271
    return ret;
272
}
273

    
274
static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
275
{
276
    int64_t bytes = 0;
277
    int64_t objsize;
278
    int obj_order = 0;
279
    char pool[RBD_MAX_POOL_NAME_SIZE];
280
    char name[RBD_MAX_IMAGE_NAME_SIZE];
281
    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
282
    char conf[RBD_MAX_CONF_SIZE];
283
    char clientname_buf[RBD_MAX_CONF_SIZE];
284
    char *clientname;
285
    rados_t cluster;
286
    rados_ioctx_t io_ctx;
287
    int ret;
288

    
289
    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
290
                           snap_buf, sizeof(snap_buf),
291
                           name, sizeof(name),
292
                           conf, sizeof(conf)) < 0) {
293
        return -EINVAL;
294
    }
295

    
296
    /* Read out options */
297
    while (options && options->name) {
298
        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
299
            bytes = options->value.n;
300
        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
301
            if (options->value.n) {
302
                objsize = options->value.n;
303
                if ((objsize - 1) & objsize) {    /* not a power of 2? */
304
                    error_report("obj size needs to be power of 2");
305
                    return -EINVAL;
306
                }
307
                if (objsize < 4096) {
308
                    error_report("obj size too small");
309
                    return -EINVAL;
310
                }
311
                obj_order = ffs(objsize) - 1;
312
            }
313
        }
314
        options++;
315
    }
316

    
317
    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
318
    if (rados_create(&cluster, clientname) < 0) {
319
        error_report("error initializing");
320
        return -EIO;
321
    }
322

    
323
    if (strstr(conf, "conf=") == NULL) {
324
        /* try default location, but ignore failure */
325
        rados_conf_read_file(cluster, NULL);
326
    }
327

    
328
    if (conf[0] != '\0' &&
329
        qemu_rbd_set_conf(cluster, conf) < 0) {
330
        error_report("error setting config options");
331
        rados_shutdown(cluster);
332
        return -EIO;
333
    }
334

    
335
    if (rados_connect(cluster) < 0) {
336
        error_report("error connecting");
337
        rados_shutdown(cluster);
338
        return -EIO;
339
    }
340

    
341
    if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) {
342
        error_report("error opening pool %s", pool);
343
        rados_shutdown(cluster);
344
        return -EIO;
345
    }
346

    
347
    ret = rbd_create(io_ctx, name, bytes, &obj_order);
348
    rados_ioctx_destroy(io_ctx);
349
    rados_shutdown(cluster);
350

    
351
    return ret;
352
}
353

    
354
/*
355
 * This aio completion is being called from qemu_rbd_aio_event_reader()
356
 * and runs in qemu context. It schedules a bh, but just in case the aio
357
 * was not cancelled before.
358
 */
359
static void qemu_rbd_complete_aio(RADOSCB *rcb)
360
{
361
    RBDAIOCB *acb = rcb->acb;
362
    int64_t r;
363

    
364
    if (acb->cancelled) {
365
        qemu_vfree(acb->bounce);
366
        qemu_aio_release(acb);
367
        goto done;
368
    }
369

    
370
    r = rcb->ret;
371

    
372
    if (acb->write) {
373
        if (r < 0) {
374
            acb->ret = r;
375
            acb->error = 1;
376
        } else if (!acb->error) {
377
            acb->ret = rcb->size;
378
        }
379
    } else {
380
        if (r < 0) {
381
            memset(rcb->buf, 0, rcb->size);
382
            acb->ret = r;
383
            acb->error = 1;
384
        } else if (r < rcb->size) {
385
            memset(rcb->buf + r, 0, rcb->size - r);
386
            if (!acb->error) {
387
                acb->ret = rcb->size;
388
            }
389
        } else if (!acb->error) {
390
            acb->ret = r;
391
        }
392
    }
393
    /* Note that acb->bh can be NULL in case where the aio was cancelled */
394
    acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
395
    qemu_bh_schedule(acb->bh);
396
done:
397
    g_free(rcb);
398
}
399

    
400
/*
401
 * aio fd read handler. It runs in the qemu context and calls the
402
 * completion handling of completed rados aio operations.
403
 */
404
static void qemu_rbd_aio_event_reader(void *opaque)
405
{
406
    BDRVRBDState *s = opaque;
407

    
408
    ssize_t ret;
409

    
410
    do {
411
        char *p = (char *)&s->event_rcb;
412

    
413
        /* now read the rcb pointer that was sent from a non qemu thread */
414
        ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
415
                   sizeof(s->event_rcb) - s->event_reader_pos);
416
        if (ret > 0) {
417
            s->event_reader_pos += ret;
418
            if (s->event_reader_pos == sizeof(s->event_rcb)) {
419
                s->event_reader_pos = 0;
420
                qemu_rbd_complete_aio(s->event_rcb);
421
                s->qemu_aio_count--;
422
            }
423
        }
424
    } while (ret < 0 && errno == EINTR);
425
}
426

    
427
static int qemu_rbd_aio_flush_cb(void *opaque)
428
{
429
    BDRVRBDState *s = opaque;
430

    
431
    return (s->qemu_aio_count > 0);
432
}
433

    
434
static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
435
{
436
    BDRVRBDState *s = bs->opaque;
437
    char pool[RBD_MAX_POOL_NAME_SIZE];
438
    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
439
    char conf[RBD_MAX_CONF_SIZE];
440
    char clientname_buf[RBD_MAX_CONF_SIZE];
441
    char *clientname;
442
    int r;
443

    
444
    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
445
                           snap_buf, sizeof(snap_buf),
446
                           s->name, sizeof(s->name),
447
                           conf, sizeof(conf)) < 0) {
448
        return -EINVAL;
449
    }
450

    
451
    clientname = qemu_rbd_parse_clientname(conf, clientname_buf);
452
    r = rados_create(&s->cluster, clientname);
453
    if (r < 0) {
454
        error_report("error initializing");
455
        return r;
456
    }
457

    
458
    s->snap = NULL;
459
    if (snap_buf[0] != '\0') {
460
        s->snap = g_strdup(snap_buf);
461
    }
462

    
463
    if (strstr(conf, "conf=") == NULL) {
464
        /* try default location, but ignore failure */
465
        rados_conf_read_file(s->cluster, NULL);
466
    }
467

    
468
    if (conf[0] != '\0') {
469
        r = qemu_rbd_set_conf(s->cluster, conf);
470
        if (r < 0) {
471
            error_report("error setting config options");
472
            goto failed_shutdown;
473
        }
474
    }
475

    
476
    r = rados_connect(s->cluster);
477
    if (r < 0) {
478
        error_report("error connecting");
479
        goto failed_shutdown;
480
    }
481

    
482
    r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
483
    if (r < 0) {
484
        error_report("error opening pool %s", pool);
485
        goto failed_shutdown;
486
    }
487

    
488
    r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
489
    if (r < 0) {
490
        error_report("error reading header from %s", s->name);
491
        goto failed_open;
492
    }
493

    
494
    bs->read_only = (s->snap != NULL);
495

    
496
    s->event_reader_pos = 0;
497
    r = qemu_pipe(s->fds);
498
    if (r < 0) {
499
        error_report("error opening eventfd");
500
        goto failed;
501
    }
502
    fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
503
    fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
504
    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
505
                            NULL, qemu_rbd_aio_flush_cb, NULL, s);
506

    
507

    
508
    return 0;
509

    
510
failed:
511
    rbd_close(s->image);
512
failed_open:
513
    rados_ioctx_destroy(s->io_ctx);
514
failed_shutdown:
515
    rados_shutdown(s->cluster);
516
    g_free(s->snap);
517
    return r;
518
}
519

    
520
static void qemu_rbd_close(BlockDriverState *bs)
521
{
522
    BDRVRBDState *s = bs->opaque;
523

    
524
    close(s->fds[0]);
525
    close(s->fds[1]);
526
    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL,
527
        NULL);
528

    
529
    rbd_close(s->image);
530
    rados_ioctx_destroy(s->io_ctx);
531
    g_free(s->snap);
532
    rados_shutdown(s->cluster);
533
}
534

    
535
/*
536
 * Cancel aio. Since we don't reference acb in a non qemu threads,
537
 * it is safe to access it here.
538
 */
539
static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
540
{
541
    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
542
    acb->cancelled = 1;
543
}
544

    
545
static AIOPool rbd_aio_pool = {
546
    .aiocb_size = sizeof(RBDAIOCB),
547
    .cancel = qemu_rbd_aio_cancel,
548
};
549

    
550
static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
551
{
552
    int ret = 0;
553
    while (1) {
554
        fd_set wfd;
555
        int fd = s->fds[RBD_FD_WRITE];
556

    
557
        /* send the op pointer to the qemu thread that is responsible
558
           for the aio/op completion. Must do it in a qemu thread context */
559
        ret = write(fd, (void *)&rcb, sizeof(rcb));
560
        if (ret >= 0) {
561
            break;
562
        }
563
        if (errno == EINTR) {
564
            continue;
565
        }
566
        if (errno != EAGAIN) {
567
            break;
568
        }
569

    
570
        FD_ZERO(&wfd);
571
        FD_SET(fd, &wfd);
572
        do {
573
            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
574
        } while (ret < 0 && errno == EINTR);
575
    }
576

    
577
    return ret;
578
}
579

    
580
/*
581
 * This is the callback function for rbd_aio_read and _write
582
 *
583
 * Note: this function is being called from a non qemu thread so
584
 * we need to be careful about what we do here. Generally we only
585
 * write to the block notification pipe, and do the rest of the
586
 * io completion handling from qemu_rbd_aio_event_reader() which
587
 * runs in a qemu context.
588
 */
589
static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
590
{
591
    int ret;
592
    rcb->ret = rbd_aio_get_return_value(c);
593
    rbd_aio_release(c);
594
    ret = qemu_rbd_send_pipe(rcb->s, rcb);
595
    if (ret < 0) {
596
        error_report("failed writing to acb->s->fds");
597
        g_free(rcb);
598
    }
599
}
600

    
601
/* Callback when all queued rbd_aio requests are complete */
602

    
603
static void rbd_aio_bh_cb(void *opaque)
604
{
605
    RBDAIOCB *acb = opaque;
606

    
607
    if (!acb->write) {
608
        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
609
    }
610
    qemu_vfree(acb->bounce);
611
    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
612
    qemu_bh_delete(acb->bh);
613
    acb->bh = NULL;
614

    
615
    qemu_aio_release(acb);
616
}
617

    
618
static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
619
                                           int64_t sector_num,
620
                                           QEMUIOVector *qiov,
621
                                           int nb_sectors,
622
                                           BlockDriverCompletionFunc *cb,
623
                                           void *opaque, int write)
624
{
625
    RBDAIOCB *acb;
626
    RADOSCB *rcb;
627
    rbd_completion_t c;
628
    int64_t off, size;
629
    char *buf;
630
    int r;
631

    
632
    BDRVRBDState *s = bs->opaque;
633

    
634
    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
635
    if (!acb) {
636
        return NULL;
637
    }
638
    acb->write = write;
639
    acb->qiov = qiov;
640
    acb->bounce = qemu_blockalign(bs, qiov->size);
641
    acb->ret = 0;
642
    acb->error = 0;
643
    acb->s = s;
644
    acb->cancelled = 0;
645
    acb->bh = NULL;
646

    
647
    if (write) {
648
        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
649
    }
650

    
651
    buf = acb->bounce;
652

    
653
    off = sector_num * BDRV_SECTOR_SIZE;
654
    size = nb_sectors * BDRV_SECTOR_SIZE;
655

    
656
    s->qemu_aio_count++; /* All the RADOSCB */
657

    
658
    rcb = g_malloc(sizeof(RADOSCB));
659
    rcb->done = 0;
660
    rcb->acb = acb;
661
    rcb->buf = buf;
662
    rcb->s = acb->s;
663
    rcb->size = size;
664
    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
665
    if (r < 0) {
666
        goto failed;
667
    }
668

    
669
    if (write) {
670
        r = rbd_aio_write(s->image, off, size, buf, c);
671
    } else {
672
        r = rbd_aio_read(s->image, off, size, buf, c);
673
    }
674

    
675
    if (r < 0) {
676
        goto failed;
677
    }
678

    
679
    return &acb->common;
680

    
681
failed:
682
    g_free(rcb);
683
    s->qemu_aio_count--;
684
    qemu_aio_release(acb);
685
    return NULL;
686
}
687

    
688
static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
689
                                            int64_t sector_num,
690
                                            QEMUIOVector *qiov,
691
                                            int nb_sectors,
692
                                            BlockDriverCompletionFunc *cb,
693
                                            void *opaque)
694
{
695
    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
696
}
697

    
698
static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
699
                                             int64_t sector_num,
700
                                             QEMUIOVector *qiov,
701
                                             int nb_sectors,
702
                                             BlockDriverCompletionFunc *cb,
703
                                             void *opaque)
704
{
705
    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
706
}
707

    
708
static int qemu_rbd_flush(BlockDriverState *bs)
709
{
710
#if LIBRBD_VERSION_CODE >= LIBRBD_VERSION(0, 1, 1)
711
    /* rbd_flush added in 0.1.1 */
712
    BDRVRBDState *s = bs->opaque;
713
    return rbd_flush(s->image);
714
#else
715
    return 0;
716
#endif
717
}
718

    
719
static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
720
{
721
    BDRVRBDState *s = bs->opaque;
722
    rbd_image_info_t info;
723
    int r;
724

    
725
    r = rbd_stat(s->image, &info, sizeof(info));
726
    if (r < 0) {
727
        return r;
728
    }
729

    
730
    bdi->cluster_size = info.obj_size;
731
    return 0;
732
}
733

    
734
static int64_t qemu_rbd_getlength(BlockDriverState *bs)
735
{
736
    BDRVRBDState *s = bs->opaque;
737
    rbd_image_info_t info;
738
    int r;
739

    
740
    r = rbd_stat(s->image, &info, sizeof(info));
741
    if (r < 0) {
742
        return r;
743
    }
744

    
745
    return info.size;
746
}
747

    
748
static int qemu_rbd_truncate(BlockDriverState *bs, int64_t offset)
749
{
750
    BDRVRBDState *s = bs->opaque;
751
    int r;
752

    
753
    r = rbd_resize(s->image, offset);
754
    if (r < 0) {
755
        return r;
756
    }
757

    
758
    return 0;
759
}
760

    
761
static int qemu_rbd_snap_create(BlockDriverState *bs,
762
                                QEMUSnapshotInfo *sn_info)
763
{
764
    BDRVRBDState *s = bs->opaque;
765
    int r;
766

    
767
    if (sn_info->name[0] == '\0') {
768
        return -EINVAL; /* we need a name for rbd snapshots */
769
    }
770

    
771
    /*
772
     * rbd snapshots are using the name as the user controlled unique identifier
773
     * we can't use the rbd snapid for that purpose, as it can't be set
774
     */
775
    if (sn_info->id_str[0] != '\0' &&
776
        strcmp(sn_info->id_str, sn_info->name) != 0) {
777
        return -EINVAL;
778
    }
779

    
780
    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
781
        return -ERANGE;
782
    }
783

    
784
    r = rbd_snap_create(s->image, sn_info->name);
785
    if (r < 0) {
786
        error_report("failed to create snap: %s", strerror(-r));
787
        return r;
788
    }
789

    
790
    return 0;
791
}
792

    
793
static int qemu_rbd_snap_list(BlockDriverState *bs,
794
                              QEMUSnapshotInfo **psn_tab)
795
{
796
    BDRVRBDState *s = bs->opaque;
797
    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
798
    int i, snap_count;
799
    rbd_snap_info_t *snaps;
800
    int max_snaps = RBD_MAX_SNAPS;
801

    
802
    do {
803
        snaps = g_malloc(sizeof(*snaps) * max_snaps);
804
        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
805
        if (snap_count < 0) {
806
            g_free(snaps);
807
        }
808
    } while (snap_count == -ERANGE);
809

    
810
    if (snap_count <= 0) {
811
        return snap_count;
812
    }
813

    
814
    sn_tab = g_malloc0(snap_count * sizeof(QEMUSnapshotInfo));
815

    
816
    for (i = 0; i < snap_count; i++) {
817
        const char *snap_name = snaps[i].name;
818

    
819
        sn_info = sn_tab + i;
820
        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
821
        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
822

    
823
        sn_info->vm_state_size = snaps[i].size;
824
        sn_info->date_sec = 0;
825
        sn_info->date_nsec = 0;
826
        sn_info->vm_clock_nsec = 0;
827
    }
828
    rbd_snap_list_end(snaps);
829

    
830
    *psn_tab = sn_tab;
831
    return snap_count;
832
}
833

    
834
static QEMUOptionParameter qemu_rbd_create_options[] = {
835
    {
836
     .name = BLOCK_OPT_SIZE,
837
     .type = OPT_SIZE,
838
     .help = "Virtual disk size"
839
    },
840
    {
841
     .name = BLOCK_OPT_CLUSTER_SIZE,
842
     .type = OPT_SIZE,
843
     .help = "RBD object size"
844
    },
845
    {NULL}
846
};
847

    
848
static BlockDriver bdrv_rbd = {
849
    .format_name        = "rbd",
850
    .instance_size      = sizeof(BDRVRBDState),
851
    .bdrv_file_open     = qemu_rbd_open,
852
    .bdrv_close         = qemu_rbd_close,
853
    .bdrv_create        = qemu_rbd_create,
854
    .bdrv_flush         = qemu_rbd_flush,
855
    .bdrv_get_info      = qemu_rbd_getinfo,
856
    .create_options     = qemu_rbd_create_options,
857
    .bdrv_getlength     = qemu_rbd_getlength,
858
    .bdrv_truncate      = qemu_rbd_truncate,
859
    .protocol_name      = "rbd",
860

    
861
    .bdrv_aio_readv     = qemu_rbd_aio_readv,
862
    .bdrv_aio_writev    = qemu_rbd_aio_writev,
863

    
864
    .bdrv_snapshot_create = qemu_rbd_snap_create,
865
    .bdrv_snapshot_list = qemu_rbd_snap_list,
866
};
867

    
868
static void bdrv_rbd_init(void)
869
{
870
    bdrv_register(&bdrv_rbd);
871
}
872

    
873
block_init(bdrv_rbd_init);