Statistics
| Branch: | Revision:

root / block / rbd.c @ ad32e9c0

History | View | Annotate | Download (17.6 kB)

1
/*
2
 * QEMU Block driver for RADOS (Ceph)
3
 *
4
 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
5
 *                         Josh Durgin <josh.durgin@dreamhost.com>
6
 *
7
 * This work is licensed under the terms of the GNU GPL, version 2.  See
8
 * the COPYING file in the top-level directory.
9
 *
10
 */
11

    
12
#include <inttypes.h>
13

    
14
#include "qemu-common.h"
15
#include "qemu-error.h"
16

    
17
#include "block_int.h"
18

    
19
#include <rbd/librbd.h>
20

    
21

    
22

    
23
/*
24
 * When specifying the image filename use:
25
 *
26
 * rbd:poolname/devicename
27
 *
28
 * poolname must be the name of an existing rados pool
29
 *
30
 * devicename is the basename for all objects used to
31
 * emulate the raw device.
32
 *
33
 * Metadata information (image size, ...) is stored in an
34
 * object with the name "devicename.rbd".
35
 *
36
 * The raw device is split into 4MB sized objects by default.
37
 * The sequencenumber is encoded in a 12 byte long hex-string,
38
 * and is attached to the devicename, separated by a dot.
39
 * e.g. "devicename.1234567890ab"
40
 *
41
 */
42

    
43
#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
44

    
45
#define RBD_MAX_CONF_NAME_SIZE 128
46
#define RBD_MAX_CONF_VAL_SIZE 512
47
#define RBD_MAX_CONF_SIZE 1024
48
#define RBD_MAX_POOL_NAME_SIZE 128
49
#define RBD_MAX_SNAP_NAME_SIZE 128
50
#define RBD_MAX_SNAPS 100
51

    
52
typedef struct RBDAIOCB {
53
    BlockDriverAIOCB common;
54
    QEMUBH *bh;
55
    int ret;
56
    QEMUIOVector *qiov;
57
    char *bounce;
58
    int write;
59
    int64_t sector_num;
60
    int error;
61
    struct BDRVRBDState *s;
62
    int cancelled;
63
} RBDAIOCB;
64

    
65
typedef struct RADOSCB {
66
    int rcbid;
67
    RBDAIOCB *acb;
68
    struct BDRVRBDState *s;
69
    int done;
70
    int64_t size;
71
    char *buf;
72
    int ret;
73
} RADOSCB;
74

    
75
#define RBD_FD_READ 0
76
#define RBD_FD_WRITE 1
77

    
78
typedef struct BDRVRBDState {
79
    int fds[2];
80
    rados_t cluster;
81
    rados_ioctx_t io_ctx;
82
    rbd_image_t image;
83
    char name[RBD_MAX_IMAGE_NAME_SIZE];
84
    int qemu_aio_count;
85
    char *snap;
86
    int event_reader_pos;
87
    RADOSCB *event_rcb;
88
} BDRVRBDState;
89

    
90
static void rbd_aio_bh_cb(void *opaque);
91

    
92
static int qemu_rbd_next_tok(char *dst, int dst_len,
93
                             char *src, char delim,
94
                             const char *name,
95
                             char **p)
96
{
97
    int l;
98
    char *end;
99

    
100
    *p = NULL;
101

    
102
    if (delim != '\0') {
103
        end = strchr(src, delim);
104
        if (end) {
105
            *p = end + 1;
106
            *end = '\0';
107
        }
108
    }
109
    l = strlen(src);
110
    if (l >= dst_len) {
111
        error_report("%s too long", name);
112
        return -EINVAL;
113
    } else if (l == 0) {
114
        error_report("%s too short", name);
115
        return -EINVAL;
116
    }
117

    
118
    pstrcpy(dst, dst_len, src);
119

    
120
    return 0;
121
}
122

    
123
static int qemu_rbd_parsename(const char *filename,
124
                              char *pool, int pool_len,
125
                              char *snap, int snap_len,
126
                              char *name, int name_len)
127
{
128
    const char *start;
129
    char *p, *buf;
130
    int ret;
131

    
132
    if (!strstart(filename, "rbd:", &start)) {
133
        return -EINVAL;
134
    }
135

    
136
    buf = qemu_strdup(start);
137
    p = buf;
138

    
139
    ret = qemu_rbd_next_tok(pool, pool_len, p, '/', "pool name", &p);
140
    if (ret < 0 || !p) {
141
        ret = -EINVAL;
142
        goto done;
143
    }
144
    ret = qemu_rbd_next_tok(name, name_len, p, '@', "object name", &p);
145
    if (ret < 0) {
146
        goto done;
147
    }
148
    if (!p) {
149
        *snap = '\0';
150
        goto done;
151
    }
152

    
153
    ret = qemu_rbd_next_tok(snap, snap_len, p, '\0', "snap name", &p);
154

    
155
done:
156
    qemu_free(buf);
157
    return ret;
158
}
159

    
160
static int qemu_rbd_create(const char *filename, QEMUOptionParameter *options)
161
{
162
    int64_t bytes = 0;
163
    int64_t objsize;
164
    int obj_order = 0;
165
    char pool[RBD_MAX_POOL_NAME_SIZE];
166
    char name[RBD_MAX_IMAGE_NAME_SIZE];
167
    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
168
    char *snap = NULL;
169
    rados_t cluster;
170
    rados_ioctx_t io_ctx;
171
    int ret;
172

    
173
    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
174
                           snap_buf, sizeof(snap_buf),
175
                           name, sizeof(name)) < 0) {
176
        return -EINVAL;
177
    }
178
    if (snap_buf[0] != '\0') {
179
        snap = snap_buf;
180
    }
181

    
182
    /* Read out options */
183
    while (options && options->name) {
184
        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
185
            bytes = options->value.n;
186
        } else if (!strcmp(options->name, BLOCK_OPT_CLUSTER_SIZE)) {
187
            if (options->value.n) {
188
                objsize = options->value.n;
189
                if ((objsize - 1) & objsize) {    /* not a power of 2? */
190
                    error_report("obj size needs to be power of 2");
191
                    return -EINVAL;
192
                }
193
                if (objsize < 4096) {
194
                    error_report("obj size too small");
195
                    return -EINVAL;
196
                }
197
                obj_order = ffs(objsize) - 1;
198
            }
199
        }
200
        options++;
201
    }
202

    
203
    if (rados_create(&cluster, NULL) < 0) {
204
        error_report("error initializing");
205
        return -EIO;
206
    }
207

    
208
    if (rados_conf_read_file(cluster, NULL) < 0) {
209
        error_report("error reading config file");
210
        rados_shutdown(cluster);
211
        return -EIO;
212
    }
213

    
214
    if (rados_connect(cluster) < 0) {
215
        error_report("error connecting");
216
        rados_shutdown(cluster);
217
        return -EIO;
218
    }
219

    
220
    if (rados_ioctx_create(cluster, pool, &io_ctx) < 0) {
221
        error_report("error opening pool %s", pool);
222
        rados_shutdown(cluster);
223
        return -EIO;
224
    }
225

    
226
    ret = rbd_create(io_ctx, name, bytes, &obj_order);
227
    rados_ioctx_destroy(io_ctx);
228
    rados_shutdown(cluster);
229

    
230
    return ret;
231
}
232

    
233
/*
234
 * This aio completion is being called from qemu_rbd_aio_event_reader()
235
 * and runs in qemu context. It schedules a bh, but just in case the aio
236
 * was not cancelled before.
237
 */
238
static void qemu_rbd_complete_aio(RADOSCB *rcb)
239
{
240
    RBDAIOCB *acb = rcb->acb;
241
    int64_t r;
242

    
243
    if (acb->cancelled) {
244
        qemu_vfree(acb->bounce);
245
        qemu_aio_release(acb);
246
        goto done;
247
    }
248

    
249
    r = rcb->ret;
250

    
251
    if (acb->write) {
252
        if (r < 0) {
253
            acb->ret = r;
254
            acb->error = 1;
255
        } else if (!acb->error) {
256
            acb->ret = rcb->size;
257
        }
258
    } else {
259
        if (r < 0) {
260
            memset(rcb->buf, 0, rcb->size);
261
            acb->ret = r;
262
            acb->error = 1;
263
        } else if (r < rcb->size) {
264
            memset(rcb->buf + r, 0, rcb->size - r);
265
            if (!acb->error) {
266
                acb->ret = rcb->size;
267
            }
268
        } else if (!acb->error) {
269
            acb->ret = r;
270
        }
271
    }
272
    /* Note that acb->bh can be NULL in case where the aio was cancelled */
273
    acb->bh = qemu_bh_new(rbd_aio_bh_cb, acb);
274
    qemu_bh_schedule(acb->bh);
275
done:
276
    qemu_free(rcb);
277
}
278

    
279
/*
280
 * aio fd read handler. It runs in the qemu context and calls the
281
 * completion handling of completed rados aio operations.
282
 */
283
static void qemu_rbd_aio_event_reader(void *opaque)
284
{
285
    BDRVRBDState *s = opaque;
286

    
287
    ssize_t ret;
288

    
289
    do {
290
        char *p = (char *)&s->event_rcb;
291

    
292
        /* now read the rcb pointer that was sent from a non qemu thread */
293
        if ((ret = read(s->fds[RBD_FD_READ], p + s->event_reader_pos,
294
                        sizeof(s->event_rcb) - s->event_reader_pos)) > 0) {
295
            if (ret > 0) {
296
                s->event_reader_pos += ret;
297
                if (s->event_reader_pos == sizeof(s->event_rcb)) {
298
                    s->event_reader_pos = 0;
299
                    qemu_rbd_complete_aio(s->event_rcb);
300
                    s->qemu_aio_count--;
301
                }
302
            }
303
        }
304
    } while (ret < 0 && errno == EINTR);
305
}
306

    
307
static int qemu_rbd_aio_flush_cb(void *opaque)
308
{
309
    BDRVRBDState *s = opaque;
310

    
311
    return (s->qemu_aio_count > 0);
312
}
313

    
314
static int qemu_rbd_open(BlockDriverState *bs, const char *filename, int flags)
315
{
316
    BDRVRBDState *s = bs->opaque;
317
    char pool[RBD_MAX_POOL_NAME_SIZE];
318
    char snap_buf[RBD_MAX_SNAP_NAME_SIZE];
319
    int r;
320

    
321
    if (qemu_rbd_parsename(filename, pool, sizeof(pool),
322
                           snap_buf, sizeof(snap_buf),
323
                           s->name, sizeof(s->name)) < 0) {
324
        return -EINVAL;
325
    }
326
    s->snap = NULL;
327
    if (snap_buf[0] != '\0') {
328
        s->snap = qemu_strdup(snap_buf);
329
    }
330

    
331
    r = rados_create(&s->cluster, NULL);
332
    if (r < 0) {
333
        error_report("error initializing");
334
        return r;
335
    }
336

    
337
    r = rados_conf_read_file(s->cluster, NULL);
338
    if (r < 0) {
339
        error_report("error reading config file");
340
        rados_shutdown(s->cluster);
341
        return r;
342
    }
343

    
344
    r = rados_connect(s->cluster);
345
    if (r < 0) {
346
        error_report("error connecting");
347
        rados_shutdown(s->cluster);
348
        return r;
349
    }
350

    
351
    r = rados_ioctx_create(s->cluster, pool, &s->io_ctx);
352
    if (r < 0) {
353
        error_report("error opening pool %s", pool);
354
        rados_shutdown(s->cluster);
355
        return r;
356
    }
357

    
358
    r = rbd_open(s->io_ctx, s->name, &s->image, s->snap);
359
    if (r < 0) {
360
        error_report("error reading header from %s", s->name);
361
        rados_ioctx_destroy(s->io_ctx);
362
        rados_shutdown(s->cluster);
363
        return r;
364
    }
365

    
366
    bs->read_only = (s->snap != NULL);
367

    
368
    s->event_reader_pos = 0;
369
    r = qemu_pipe(s->fds);
370
    if (r < 0) {
371
        error_report("error opening eventfd");
372
        goto failed;
373
    }
374
    fcntl(s->fds[0], F_SETFL, O_NONBLOCK);
375
    fcntl(s->fds[1], F_SETFL, O_NONBLOCK);
376
    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], qemu_rbd_aio_event_reader,
377
                            NULL, qemu_rbd_aio_flush_cb, NULL, s);
378

    
379

    
380
    return 0;
381

    
382
failed:
383
    rbd_close(s->image);
384
    rados_ioctx_destroy(s->io_ctx);
385
    rados_shutdown(s->cluster);
386
    return r;
387
}
388

    
389
static void qemu_rbd_close(BlockDriverState *bs)
390
{
391
    BDRVRBDState *s = bs->opaque;
392

    
393
    close(s->fds[0]);
394
    close(s->fds[1]);
395
    qemu_aio_set_fd_handler(s->fds[RBD_FD_READ], NULL , NULL, NULL, NULL,
396
        NULL);
397

    
398
    rbd_close(s->image);
399
    rados_ioctx_destroy(s->io_ctx);
400
    qemu_free(s->snap);
401
    rados_shutdown(s->cluster);
402
}
403

    
404
/*
405
 * Cancel aio. Since we don't reference acb in a non qemu threads,
406
 * it is safe to access it here.
407
 */
408
static void qemu_rbd_aio_cancel(BlockDriverAIOCB *blockacb)
409
{
410
    RBDAIOCB *acb = (RBDAIOCB *) blockacb;
411
    acb->cancelled = 1;
412
}
413

    
414
static AIOPool rbd_aio_pool = {
415
    .aiocb_size = sizeof(RBDAIOCB),
416
    .cancel = qemu_rbd_aio_cancel,
417
};
418

    
419
static int qemu_rbd_send_pipe(BDRVRBDState *s, RADOSCB *rcb)
420
{
421
    int ret = 0;
422
    while (1) {
423
        fd_set wfd;
424
        int fd = s->fds[RBD_FD_WRITE];
425

    
426
        /* send the op pointer to the qemu thread that is responsible
427
           for the aio/op completion. Must do it in a qemu thread context */
428
        ret = write(fd, (void *)&rcb, sizeof(rcb));
429
        if (ret >= 0) {
430
            break;
431
        }
432
        if (errno == EINTR) {
433
            continue;
434
        }
435
        if (errno != EAGAIN) {
436
            break;
437
        }
438

    
439
        FD_ZERO(&wfd);
440
        FD_SET(fd, &wfd);
441
        do {
442
            ret = select(fd + 1, NULL, &wfd, NULL, NULL);
443
        } while (ret < 0 && errno == EINTR);
444
    }
445

    
446
    return ret;
447
}
448

    
449
/*
450
 * This is the callback function for rbd_aio_read and _write
451
 *
452
 * Note: this function is being called from a non qemu thread so
453
 * we need to be careful about what we do here. Generally we only
454
 * write to the block notification pipe, and do the rest of the
455
 * io completion handling from qemu_rbd_aio_event_reader() which
456
 * runs in a qemu context.
457
 */
458
static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
459
{
460
    int ret;
461
    rcb->ret = rbd_aio_get_return_value(c);
462
    rbd_aio_release(c);
463
    ret = qemu_rbd_send_pipe(rcb->s, rcb);
464
    if (ret < 0) {
465
        error_report("failed writing to acb->s->fds");
466
        qemu_free(rcb);
467
    }
468
}
469

    
470
/* Callback when all queued rbd_aio requests are complete */
471

    
472
static void rbd_aio_bh_cb(void *opaque)
473
{
474
    RBDAIOCB *acb = opaque;
475

    
476
    if (!acb->write) {
477
        qemu_iovec_from_buffer(acb->qiov, acb->bounce, acb->qiov->size);
478
    }
479
    qemu_vfree(acb->bounce);
480
    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
481
    qemu_bh_delete(acb->bh);
482
    acb->bh = NULL;
483

    
484
    qemu_aio_release(acb);
485
}
486

    
487
static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
488
                                           int64_t sector_num,
489
                                           QEMUIOVector *qiov,
490
                                           int nb_sectors,
491
                                           BlockDriverCompletionFunc *cb,
492
                                           void *opaque, int write)
493
{
494
    RBDAIOCB *acb;
495
    RADOSCB *rcb;
496
    rbd_completion_t c;
497
    int64_t off, size;
498
    char *buf;
499

    
500
    BDRVRBDState *s = bs->opaque;
501

    
502
    acb = qemu_aio_get(&rbd_aio_pool, bs, cb, opaque);
503
    acb->write = write;
504
    acb->qiov = qiov;
505
    acb->bounce = qemu_blockalign(bs, qiov->size);
506
    acb->ret = 0;
507
    acb->error = 0;
508
    acb->s = s;
509
    acb->cancelled = 0;
510
    acb->bh = NULL;
511

    
512
    if (write) {
513
        qemu_iovec_to_buffer(acb->qiov, acb->bounce);
514
    }
515

    
516
    buf = acb->bounce;
517

    
518
    off = sector_num * BDRV_SECTOR_SIZE;
519
    size = nb_sectors * BDRV_SECTOR_SIZE;
520

    
521
    s->qemu_aio_count++; /* All the RADOSCB */
522

    
523
    rcb = qemu_malloc(sizeof(RADOSCB));
524
    rcb->done = 0;
525
    rcb->acb = acb;
526
    rcb->buf = buf;
527
    rcb->s = acb->s;
528
    rcb->size = size;
529

    
530
    if (write) {
531
        rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
532
        rbd_aio_write(s->image, off, size, buf, c);
533
    } else {
534
        rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
535
        rbd_aio_read(s->image, off, size, buf, c);
536
    }
537

    
538
    return &acb->common;
539
}
540

    
541
static BlockDriverAIOCB *qemu_rbd_aio_readv(BlockDriverState *bs,
542
                                            int64_t sector_num,
543
                                            QEMUIOVector *qiov,
544
                                            int nb_sectors,
545
                                            BlockDriverCompletionFunc *cb,
546
                                            void *opaque)
547
{
548
    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
549
}
550

    
551
static BlockDriverAIOCB *qemu_rbd_aio_writev(BlockDriverState *bs,
552
                                             int64_t sector_num,
553
                                             QEMUIOVector *qiov,
554
                                             int nb_sectors,
555
                                             BlockDriverCompletionFunc *cb,
556
                                             void *opaque)
557
{
558
    return rbd_aio_rw_vector(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
559
}
560

    
561
static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
562
{
563
    BDRVRBDState *s = bs->opaque;
564
    rbd_image_info_t info;
565
    int r;
566

    
567
    r = rbd_stat(s->image, &info, sizeof(info));
568
    if (r < 0) {
569
        return r;
570
    }
571

    
572
    bdi->cluster_size = info.obj_size;
573
    return 0;
574
}
575

    
576
static int64_t qemu_rbd_getlength(BlockDriverState *bs)
577
{
578
    BDRVRBDState *s = bs->opaque;
579
    rbd_image_info_t info;
580
    int r;
581

    
582
    r = rbd_stat(s->image, &info, sizeof(info));
583
    if (r < 0) {
584
        return r;
585
    }
586

    
587
    return info.size;
588
}
589

    
590
static int qemu_rbd_snap_create(BlockDriverState *bs,
591
                                QEMUSnapshotInfo *sn_info)
592
{
593
    BDRVRBDState *s = bs->opaque;
594
    int r;
595

    
596
    if (sn_info->name[0] == '\0') {
597
        return -EINVAL; /* we need a name for rbd snapshots */
598
    }
599

    
600
    /*
601
     * rbd snapshots are using the name as the user controlled unique identifier
602
     * we can't use the rbd snapid for that purpose, as it can't be set
603
     */
604
    if (sn_info->id_str[0] != '\0' &&
605
        strcmp(sn_info->id_str, sn_info->name) != 0) {
606
        return -EINVAL;
607
    }
608

    
609
    if (strlen(sn_info->name) >= sizeof(sn_info->id_str)) {
610
        return -ERANGE;
611
    }
612

    
613
    r = rbd_snap_create(s->image, sn_info->name);
614
    if (r < 0) {
615
        error_report("failed to create snap: %s", strerror(-r));
616
        return r;
617
    }
618

    
619
    return 0;
620
}
621

    
622
static int qemu_rbd_snap_list(BlockDriverState *bs,
623
                              QEMUSnapshotInfo **psn_tab)
624
{
625
    BDRVRBDState *s = bs->opaque;
626
    QEMUSnapshotInfo *sn_info, *sn_tab = NULL;
627
    int i, snap_count;
628
    rbd_snap_info_t *snaps;
629
    int max_snaps = RBD_MAX_SNAPS;
630

    
631
    do {
632
        snaps = qemu_malloc(sizeof(*snaps) * max_snaps);
633
        snap_count = rbd_snap_list(s->image, snaps, &max_snaps);
634
        if (snap_count < 0) {
635
            qemu_free(snaps);
636
        }
637
    } while (snap_count == -ERANGE);
638

    
639
    if (snap_count <= 0) {
640
        return snap_count;
641
    }
642

    
643
    sn_tab = qemu_mallocz(snap_count * sizeof(QEMUSnapshotInfo));
644

    
645
    for (i = 0; i < snap_count; i++) {
646
        const char *snap_name = snaps[i].name;
647

    
648
        sn_info = sn_tab + i;
649
        pstrcpy(sn_info->id_str, sizeof(sn_info->id_str), snap_name);
650
        pstrcpy(sn_info->name, sizeof(sn_info->name), snap_name);
651

    
652
        sn_info->vm_state_size = snaps[i].size;
653
        sn_info->date_sec = 0;
654
        sn_info->date_nsec = 0;
655
        sn_info->vm_clock_nsec = 0;
656
    }
657
    rbd_snap_list_end(snaps);
658

    
659
    *psn_tab = sn_tab;
660
    return snap_count;
661
}
662

    
663
static QEMUOptionParameter qemu_rbd_create_options[] = {
664
    {
665
     .name = BLOCK_OPT_SIZE,
666
     .type = OPT_SIZE,
667
     .help = "Virtual disk size"
668
    },
669
    {
670
     .name = BLOCK_OPT_CLUSTER_SIZE,
671
     .type = OPT_SIZE,
672
     .help = "RBD object size"
673
    },
674
    {NULL}
675
};
676

    
677
static BlockDriver bdrv_rbd = {
678
    .format_name        = "rbd",
679
    .instance_size      = sizeof(BDRVRBDState),
680
    .bdrv_file_open     = qemu_rbd_open,
681
    .bdrv_close         = qemu_rbd_close,
682
    .bdrv_create        = qemu_rbd_create,
683
    .bdrv_get_info      = qemu_rbd_getinfo,
684
    .create_options     = qemu_rbd_create_options,
685
    .bdrv_getlength     = qemu_rbd_getlength,
686
    .protocol_name      = "rbd",
687

    
688
    .bdrv_aio_readv     = qemu_rbd_aio_readv,
689
    .bdrv_aio_writev    = qemu_rbd_aio_writev,
690

    
691
    .bdrv_snapshot_create = qemu_rbd_snap_create,
692
    .bdrv_snapshot_list = qemu_rbd_snap_list,
693
};
694

    
695
static void bdrv_rbd_init(void)
696
{
697
    bdrv_register(&bdrv_rbd);
698
}
699

    
700
block_init(bdrv_rbd_init);