Statistics
| Branch: | Revision:

root / block / gluster.c @ 1de7afc9

History | View | Annotate | Download (17 kB)

1
/*
2
 * GlusterFS backend for QEMU
3
 *
4
 * Copyright (C) 2012 Bharata B Rao <bharata@linux.vnet.ibm.com>
5
 *
6
 * Pipe handling mechanism in AIO implementation is derived from
7
 * block/rbd.c. Hence,
8
 *
9
 * Copyright (C) 2010-2011 Christian Brunner <chb@muc.de>,
10
 *                         Josh Durgin <josh.durgin@dreamhost.com>
11
 *
12
 * This work is licensed under the terms of the GNU GPL, version 2.  See
13
 * the COPYING file in the top-level directory.
14
 *
15
 * Contributions after 2012-01-13 are licensed under the terms of the
16
 * GNU GPL, version 2 or (at your option) any later version.
17
 */
18
#include <glusterfs/api/glfs.h>
19
#include "block/block_int.h"
20
#include "qemu/sockets.h"
21
#include "qemu/uri.h"
22

    
23
typedef struct GlusterAIOCB {
24
    BlockDriverAIOCB common;
25
    int64_t size;
26
    int ret;
27
    bool *finished;
28
    QEMUBH *bh;
29
} GlusterAIOCB;
30

    
31
typedef struct BDRVGlusterState {
32
    struct glfs *glfs;
33
    int fds[2];
34
    struct glfs_fd *fd;
35
    int qemu_aio_count;
36
    int event_reader_pos;
37
    GlusterAIOCB *event_acb;
38
} BDRVGlusterState;
39

    
40
#define GLUSTER_FD_READ  0
41
#define GLUSTER_FD_WRITE 1
42

    
43
typedef struct GlusterConf {
44
    char *server;
45
    int port;
46
    char *volname;
47
    char *image;
48
    char *transport;
49
} GlusterConf;
50

    
51
static void qemu_gluster_gconf_free(GlusterConf *gconf)
52
{
53
    g_free(gconf->server);
54
    g_free(gconf->volname);
55
    g_free(gconf->image);
56
    g_free(gconf->transport);
57
    g_free(gconf);
58
}
59

    
60
static int parse_volume_options(GlusterConf *gconf, char *path)
61
{
62
    char *p, *q;
63

    
64
    if (!path) {
65
        return -EINVAL;
66
    }
67

    
68
    /* volume */
69
    p = q = path + strspn(path, "/");
70
    p += strcspn(p, "/");
71
    if (*p == '\0') {
72
        return -EINVAL;
73
    }
74
    gconf->volname = g_strndup(q, p - q);
75

    
76
    /* image */
77
    p += strspn(p, "/");
78
    if (*p == '\0') {
79
        return -EINVAL;
80
    }
81
    gconf->image = g_strdup(p);
82
    return 0;
83
}
84

    
85
/*
86
 * file=gluster[+transport]://[server[:port]]/volname/image[?socket=...]
87
 *
88
 * 'gluster' is the protocol.
89
 *
90
 * 'transport' specifies the transport type used to connect to gluster
91
 * management daemon (glusterd). Valid transport types are
92
 * tcp, unix and rdma. If a transport type isn't specified, then tcp
93
 * type is assumed.
94
 *
95
 * 'server' specifies the server where the volume file specification for
96
 * the given volume resides. This can be either hostname, ipv4 address
97
 * or ipv6 address. ipv6 address needs to be within square brackets [ ].
98
 * If transport type is 'unix', then 'server' field should not be specifed.
99
 * The 'socket' field needs to be populated with the path to unix domain
100
 * socket.
101
 *
102
 * 'port' is the port number on which glusterd is listening. This is optional
103
 * and if not specified, QEMU will send 0 which will make gluster to use the
104
 * default port. If the transport type is unix, then 'port' should not be
105
 * specified.
106
 *
107
 * 'volname' is the name of the gluster volume which contains the VM image.
108
 *
109
 * 'image' is the path to the actual VM image that resides on gluster volume.
110
 *
111
 * Examples:
112
 *
113
 * file=gluster://1.2.3.4/testvol/a.img
114
 * file=gluster+tcp://1.2.3.4/testvol/a.img
115
 * file=gluster+tcp://1.2.3.4:24007/testvol/dir/a.img
116
 * file=gluster+tcp://[1:2:3:4:5:6:7:8]/testvol/dir/a.img
117
 * file=gluster+tcp://[1:2:3:4:5:6:7:8]:24007/testvol/dir/a.img
118
 * file=gluster+tcp://server.domain.com:24007/testvol/dir/a.img
119
 * file=gluster+unix:///testvol/dir/a.img?socket=/tmp/glusterd.socket
120
 * file=gluster+rdma://1.2.3.4:24007/testvol/a.img
121
 */
122
static int qemu_gluster_parseuri(GlusterConf *gconf, const char *filename)
123
{
124
    URI *uri;
125
    QueryParams *qp = NULL;
126
    bool is_unix = false;
127
    int ret = 0;
128

    
129
    uri = uri_parse(filename);
130
    if (!uri) {
131
        return -EINVAL;
132
    }
133

    
134
    /* transport */
135
    if (!strcmp(uri->scheme, "gluster")) {
136
        gconf->transport = g_strdup("tcp");
137
    } else if (!strcmp(uri->scheme, "gluster+tcp")) {
138
        gconf->transport = g_strdup("tcp");
139
    } else if (!strcmp(uri->scheme, "gluster+unix")) {
140
        gconf->transport = g_strdup("unix");
141
        is_unix = true;
142
    } else if (!strcmp(uri->scheme, "gluster+rdma")) {
143
        gconf->transport = g_strdup("rdma");
144
    } else {
145
        ret = -EINVAL;
146
        goto out;
147
    }
148

    
149
    ret = parse_volume_options(gconf, uri->path);
150
    if (ret < 0) {
151
        goto out;
152
    }
153

    
154
    qp = query_params_parse(uri->query);
155
    if (qp->n > 1 || (is_unix && !qp->n) || (!is_unix && qp->n)) {
156
        ret = -EINVAL;
157
        goto out;
158
    }
159

    
160
    if (is_unix) {
161
        if (uri->server || uri->port) {
162
            ret = -EINVAL;
163
            goto out;
164
        }
165
        if (strcmp(qp->p[0].name, "socket")) {
166
            ret = -EINVAL;
167
            goto out;
168
        }
169
        gconf->server = g_strdup(qp->p[0].value);
170
    } else {
171
        gconf->server = g_strdup(uri->server);
172
        gconf->port = uri->port;
173
    }
174

    
175
out:
176
    if (qp) {
177
        query_params_free(qp);
178
    }
179
    uri_free(uri);
180
    return ret;
181
}
182

    
183
static struct glfs *qemu_gluster_init(GlusterConf *gconf, const char *filename)
184
{
185
    struct glfs *glfs = NULL;
186
    int ret;
187
    int old_errno;
188

    
189
    ret = qemu_gluster_parseuri(gconf, filename);
190
    if (ret < 0) {
191
        error_report("Usage: file=gluster[+transport]://[server[:port]]/"
192
            "volname/image[?socket=...]");
193
        errno = -ret;
194
        goto out;
195
    }
196

    
197
    glfs = glfs_new(gconf->volname);
198
    if (!glfs) {
199
        goto out;
200
    }
201

    
202
    ret = glfs_set_volfile_server(glfs, gconf->transport, gconf->server,
203
            gconf->port);
204
    if (ret < 0) {
205
        goto out;
206
    }
207

    
208
    /*
209
     * TODO: Use GF_LOG_ERROR instead of hard code value of 4 here when
210
     * GlusterFS makes GF_LOG_* macros available to libgfapi users.
211
     */
212
    ret = glfs_set_logging(glfs, "-", 4);
213
    if (ret < 0) {
214
        goto out;
215
    }
216

    
217
    ret = glfs_init(glfs);
218
    if (ret) {
219
        error_report("Gluster connection failed for server=%s port=%d "
220
             "volume=%s image=%s transport=%s\n", gconf->server, gconf->port,
221
             gconf->volname, gconf->image, gconf->transport);
222
        goto out;
223
    }
224
    return glfs;
225

    
226
out:
227
    if (glfs) {
228
        old_errno = errno;
229
        glfs_fini(glfs);
230
        errno = old_errno;
231
    }
232
    return NULL;
233
}
234

    
235
static void qemu_gluster_complete_aio(GlusterAIOCB *acb, BDRVGlusterState *s)
236
{
237
    int ret;
238
    bool *finished = acb->finished;
239
    BlockDriverCompletionFunc *cb = acb->common.cb;
240
    void *opaque = acb->common.opaque;
241

    
242
    if (!acb->ret || acb->ret == acb->size) {
243
        ret = 0; /* Success */
244
    } else if (acb->ret < 0) {
245
        ret = acb->ret; /* Read/Write failed */
246
    } else {
247
        ret = -EIO; /* Partial read/write - fail it */
248
    }
249

    
250
    s->qemu_aio_count--;
251
    qemu_aio_release(acb);
252
    cb(opaque, ret);
253
    if (finished) {
254
        *finished = true;
255
    }
256
}
257

    
258
static void qemu_gluster_aio_event_reader(void *opaque)
259
{
260
    BDRVGlusterState *s = opaque;
261
    ssize_t ret;
262

    
263
    do {
264
        char *p = (char *)&s->event_acb;
265

    
266
        ret = read(s->fds[GLUSTER_FD_READ], p + s->event_reader_pos,
267
                   sizeof(s->event_acb) - s->event_reader_pos);
268
        if (ret > 0) {
269
            s->event_reader_pos += ret;
270
            if (s->event_reader_pos == sizeof(s->event_acb)) {
271
                s->event_reader_pos = 0;
272
                qemu_gluster_complete_aio(s->event_acb, s);
273
            }
274
        }
275
    } while (ret < 0 && errno == EINTR);
276
}
277

    
278
static int qemu_gluster_aio_flush_cb(void *opaque)
279
{
280
    BDRVGlusterState *s = opaque;
281

    
282
    return (s->qemu_aio_count > 0);
283
}
284

    
285
static int qemu_gluster_open(BlockDriverState *bs, const char *filename,
286
    int bdrv_flags)
287
{
288
    BDRVGlusterState *s = bs->opaque;
289
    int open_flags = O_BINARY;
290
    int ret = 0;
291
    GlusterConf *gconf = g_malloc0(sizeof(GlusterConf));
292

    
293
    s->glfs = qemu_gluster_init(gconf, filename);
294
    if (!s->glfs) {
295
        ret = -errno;
296
        goto out;
297
    }
298

    
299
    if (bdrv_flags & BDRV_O_RDWR) {
300
        open_flags |= O_RDWR;
301
    } else {
302
        open_flags |= O_RDONLY;
303
    }
304

    
305
    if ((bdrv_flags & BDRV_O_NOCACHE)) {
306
        open_flags |= O_DIRECT;
307
    }
308

    
309
    s->fd = glfs_open(s->glfs, gconf->image, open_flags);
310
    if (!s->fd) {
311
        ret = -errno;
312
        goto out;
313
    }
314

    
315
    ret = qemu_pipe(s->fds);
316
    if (ret < 0) {
317
        ret = -errno;
318
        goto out;
319
    }
320
    fcntl(s->fds[GLUSTER_FD_READ], F_SETFL, O_NONBLOCK);
321
    qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ],
322
        qemu_gluster_aio_event_reader, NULL, qemu_gluster_aio_flush_cb, s);
323

    
324
out:
325
    qemu_gluster_gconf_free(gconf);
326
    if (!ret) {
327
        return ret;
328
    }
329
    if (s->fd) {
330
        glfs_close(s->fd);
331
    }
332
    if (s->glfs) {
333
        glfs_fini(s->glfs);
334
    }
335
    return ret;
336
}
337

    
338
static int qemu_gluster_create(const char *filename,
339
        QEMUOptionParameter *options)
340
{
341
    struct glfs *glfs;
342
    struct glfs_fd *fd;
343
    int ret = 0;
344
    int64_t total_size = 0;
345
    GlusterConf *gconf = g_malloc0(sizeof(GlusterConf));
346

    
347
    glfs = qemu_gluster_init(gconf, filename);
348
    if (!glfs) {
349
        ret = -errno;
350
        goto out;
351
    }
352

    
353
    while (options && options->name) {
354
        if (!strcmp(options->name, BLOCK_OPT_SIZE)) {
355
            total_size = options->value.n / BDRV_SECTOR_SIZE;
356
        }
357
        options++;
358
    }
359

    
360
    fd = glfs_creat(glfs, gconf->image,
361
        O_WRONLY | O_CREAT | O_TRUNC | O_BINARY, S_IRUSR | S_IWUSR);
362
    if (!fd) {
363
        ret = -errno;
364
    } else {
365
        if (glfs_ftruncate(fd, total_size * BDRV_SECTOR_SIZE) != 0) {
366
            ret = -errno;
367
        }
368
        if (glfs_close(fd) != 0) {
369
            ret = -errno;
370
        }
371
    }
372
out:
373
    qemu_gluster_gconf_free(gconf);
374
    if (glfs) {
375
        glfs_fini(glfs);
376
    }
377
    return ret;
378
}
379

    
380
static void qemu_gluster_aio_cancel(BlockDriverAIOCB *blockacb)
381
{
382
    GlusterAIOCB *acb = (GlusterAIOCB *)blockacb;
383
    bool finished = false;
384

    
385
    acb->finished = &finished;
386
    while (!finished) {
387
        qemu_aio_wait();
388
    }
389
}
390

    
391
static const AIOCBInfo gluster_aiocb_info = {
392
    .aiocb_size = sizeof(GlusterAIOCB),
393
    .cancel = qemu_gluster_aio_cancel,
394
};
395

    
396
static void gluster_finish_aiocb(struct glfs_fd *fd, ssize_t ret, void *arg)
397
{
398
    GlusterAIOCB *acb = (GlusterAIOCB *)arg;
399
    BlockDriverState *bs = acb->common.bs;
400
    BDRVGlusterState *s = bs->opaque;
401
    int retval;
402

    
403
    acb->ret = ret;
404
    retval = qemu_write_full(s->fds[GLUSTER_FD_WRITE], &acb, sizeof(acb));
405
    if (retval != sizeof(acb)) {
406
        /*
407
         * Gluster AIO callback thread failed to notify the waiting
408
         * QEMU thread about IO completion.
409
         *
410
         * Complete this IO request and make the disk inaccessible for
411
         * subsequent reads and writes.
412
         */
413
        error_report("Gluster failed to notify QEMU about IO completion");
414

    
415
        qemu_mutex_lock_iothread(); /* We are in gluster thread context */
416
        acb->common.cb(acb->common.opaque, -EIO);
417
        qemu_aio_release(acb);
418
        s->qemu_aio_count--;
419
        close(s->fds[GLUSTER_FD_READ]);
420
        close(s->fds[GLUSTER_FD_WRITE]);
421
        qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], NULL, NULL, NULL,
422
            NULL);
423
        bs->drv = NULL; /* Make the disk inaccessible */
424
        qemu_mutex_unlock_iothread();
425
    }
426
}
427

    
428
static BlockDriverAIOCB *qemu_gluster_aio_rw(BlockDriverState *bs,
429
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
430
        BlockDriverCompletionFunc *cb, void *opaque, int write)
431
{
432
    int ret;
433
    GlusterAIOCB *acb;
434
    BDRVGlusterState *s = bs->opaque;
435
    size_t size;
436
    off_t offset;
437

    
438
    offset = sector_num * BDRV_SECTOR_SIZE;
439
    size = nb_sectors * BDRV_SECTOR_SIZE;
440
    s->qemu_aio_count++;
441

    
442
    acb = qemu_aio_get(&gluster_aiocb_info, bs, cb, opaque);
443
    acb->size = size;
444
    acb->ret = 0;
445
    acb->finished = NULL;
446

    
447
    if (write) {
448
        ret = glfs_pwritev_async(s->fd, qiov->iov, qiov->niov, offset, 0,
449
            &gluster_finish_aiocb, acb);
450
    } else {
451
        ret = glfs_preadv_async(s->fd, qiov->iov, qiov->niov, offset, 0,
452
            &gluster_finish_aiocb, acb);
453
    }
454

    
455
    if (ret < 0) {
456
        goto out;
457
    }
458
    return &acb->common;
459

    
460
out:
461
    s->qemu_aio_count--;
462
    qemu_aio_release(acb);
463
    return NULL;
464
}
465

    
466
static BlockDriverAIOCB *qemu_gluster_aio_readv(BlockDriverState *bs,
467
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
468
        BlockDriverCompletionFunc *cb, void *opaque)
469
{
470
    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 0);
471
}
472

    
473
static BlockDriverAIOCB *qemu_gluster_aio_writev(BlockDriverState *bs,
474
        int64_t sector_num, QEMUIOVector *qiov, int nb_sectors,
475
        BlockDriverCompletionFunc *cb, void *opaque)
476
{
477
    return qemu_gluster_aio_rw(bs, sector_num, qiov, nb_sectors, cb, opaque, 1);
478
}
479

    
480
static BlockDriverAIOCB *qemu_gluster_aio_flush(BlockDriverState *bs,
481
        BlockDriverCompletionFunc *cb, void *opaque)
482
{
483
    int ret;
484
    GlusterAIOCB *acb;
485
    BDRVGlusterState *s = bs->opaque;
486

    
487
    acb = qemu_aio_get(&gluster_aiocb_info, bs, cb, opaque);
488
    acb->size = 0;
489
    acb->ret = 0;
490
    acb->finished = NULL;
491
    s->qemu_aio_count++;
492

    
493
    ret = glfs_fsync_async(s->fd, &gluster_finish_aiocb, acb);
494
    if (ret < 0) {
495
        goto out;
496
    }
497
    return &acb->common;
498

    
499
out:
500
    s->qemu_aio_count--;
501
    qemu_aio_release(acb);
502
    return NULL;
503
}
504

    
505
static int64_t qemu_gluster_getlength(BlockDriverState *bs)
506
{
507
    BDRVGlusterState *s = bs->opaque;
508
    int64_t ret;
509

    
510
    ret = glfs_lseek(s->fd, 0, SEEK_END);
511
    if (ret < 0) {
512
        return -errno;
513
    } else {
514
        return ret;
515
    }
516
}
517

    
518
static int64_t qemu_gluster_allocated_file_size(BlockDriverState *bs)
519
{
520
    BDRVGlusterState *s = bs->opaque;
521
    struct stat st;
522
    int ret;
523

    
524
    ret = glfs_fstat(s->fd, &st);
525
    if (ret < 0) {
526
        return -errno;
527
    } else {
528
        return st.st_blocks * 512;
529
    }
530
}
531

    
532
static void qemu_gluster_close(BlockDriverState *bs)
533
{
534
    BDRVGlusterState *s = bs->opaque;
535

    
536
    close(s->fds[GLUSTER_FD_READ]);
537
    close(s->fds[GLUSTER_FD_WRITE]);
538
    qemu_aio_set_fd_handler(s->fds[GLUSTER_FD_READ], NULL, NULL, NULL, NULL);
539

    
540
    if (s->fd) {
541
        glfs_close(s->fd);
542
        s->fd = NULL;
543
    }
544
    glfs_fini(s->glfs);
545
}
546

    
547
static QEMUOptionParameter qemu_gluster_create_options[] = {
548
    {
549
        .name = BLOCK_OPT_SIZE,
550
        .type = OPT_SIZE,
551
        .help = "Virtual disk size"
552
    },
553
    { NULL }
554
};
555

    
556
static BlockDriver bdrv_gluster = {
557
    .format_name                  = "gluster",
558
    .protocol_name                = "gluster",
559
    .instance_size                = sizeof(BDRVGlusterState),
560
    .bdrv_file_open               = qemu_gluster_open,
561
    .bdrv_close                   = qemu_gluster_close,
562
    .bdrv_create                  = qemu_gluster_create,
563
    .bdrv_getlength               = qemu_gluster_getlength,
564
    .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
565
    .bdrv_aio_readv               = qemu_gluster_aio_readv,
566
    .bdrv_aio_writev              = qemu_gluster_aio_writev,
567
    .bdrv_aio_flush               = qemu_gluster_aio_flush,
568
    .create_options               = qemu_gluster_create_options,
569
};
570

    
571
static BlockDriver bdrv_gluster_tcp = {
572
    .format_name                  = "gluster",
573
    .protocol_name                = "gluster+tcp",
574
    .instance_size                = sizeof(BDRVGlusterState),
575
    .bdrv_file_open               = qemu_gluster_open,
576
    .bdrv_close                   = qemu_gluster_close,
577
    .bdrv_create                  = qemu_gluster_create,
578
    .bdrv_getlength               = qemu_gluster_getlength,
579
    .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
580
    .bdrv_aio_readv               = qemu_gluster_aio_readv,
581
    .bdrv_aio_writev              = qemu_gluster_aio_writev,
582
    .bdrv_aio_flush               = qemu_gluster_aio_flush,
583
    .create_options               = qemu_gluster_create_options,
584
};
585

    
586
static BlockDriver bdrv_gluster_unix = {
587
    .format_name                  = "gluster",
588
    .protocol_name                = "gluster+unix",
589
    .instance_size                = sizeof(BDRVGlusterState),
590
    .bdrv_file_open               = qemu_gluster_open,
591
    .bdrv_close                   = qemu_gluster_close,
592
    .bdrv_create                  = qemu_gluster_create,
593
    .bdrv_getlength               = qemu_gluster_getlength,
594
    .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
595
    .bdrv_aio_readv               = qemu_gluster_aio_readv,
596
    .bdrv_aio_writev              = qemu_gluster_aio_writev,
597
    .bdrv_aio_flush               = qemu_gluster_aio_flush,
598
    .create_options               = qemu_gluster_create_options,
599
};
600

    
601
static BlockDriver bdrv_gluster_rdma = {
602
    .format_name                  = "gluster",
603
    .protocol_name                = "gluster+rdma",
604
    .instance_size                = sizeof(BDRVGlusterState),
605
    .bdrv_file_open               = qemu_gluster_open,
606
    .bdrv_close                   = qemu_gluster_close,
607
    .bdrv_create                  = qemu_gluster_create,
608
    .bdrv_getlength               = qemu_gluster_getlength,
609
    .bdrv_get_allocated_file_size = qemu_gluster_allocated_file_size,
610
    .bdrv_aio_readv               = qemu_gluster_aio_readv,
611
    .bdrv_aio_writev              = qemu_gluster_aio_writev,
612
    .bdrv_aio_flush               = qemu_gluster_aio_flush,
613
    .create_options               = qemu_gluster_create_options,
614
};
615

    
616
static void bdrv_gluster_init(void)
617
{
618
    bdrv_register(&bdrv_gluster_rdma);
619
    bdrv_register(&bdrv_gluster_unix);
620
    bdrv_register(&bdrv_gluster_tcp);
621
    bdrv_register(&bdrv_gluster);
622
}
623

    
624
block_init(bdrv_gluster_init);