Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / glusterd.c @ feature-glusterd

History | View | Annotate | Download (33.2 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#include <stdio.h>
36
#include <stdlib.h>
37
#include <unistd.h>
38
#include <xseg/xseg.h>
39
#include <peer.h>
40
#include <xseg/protocol.h>
41
#include <glusterfs/api/glfs.h>
42
#include <pthread.h>
43
#include <ctype.h>
44
#include <errno.h>
45
#include <hash.h>
46
#include <sys/types.h>
47
#include <unistd.h>
48

    
49

    
50
#define MAX_UNIQUESTR_LEN 128
51
#define LOCK_SUFFIX "_lock"
52
#define LOCK_SUFFIX_LEN 5
53
#define HASH_SUFFIX "_hash"
54
#define HASH_SUFFIX_LEN 5
55

    
56
#define MAX_OBJ_NAME (XSEG_MAX_TARGETLEN + LOCK_SUFFIX_LEN + 1)
57
#define MAX_GLFS_ARG_LEN 64
58

    
59
void custom_peer_usage()
60
{
61
         fprintf(stderr, "Custom peer options:\n"
62
                "  Option        | Default    | \n"
63
                "  --------------------------------------------\n"
64
                "    --transport | tcp        | Transport type [tcp|rdma|unix]\n"
65
                "    --server    | 127.0.0.1  | A server that is part of the\n"
66
                "                |            | volume\n"
67
                "    --port      | 0(=24007)  | Port of server's gluster daemon\n"
68
                "    --volume    | None       | Gluster volume to connect to\n"
69
                "    --uniquestr | pid        | Unique string for this instance\n"
70
                "    --async     | 0          | If 1, glusted will use the async\n"
71
                "                |            | equivalent functions\n"
72
                "\n"
73
               );
74
}
75

    
76
#define REQ_UNDEFINED -2
77
#define REQ_FAILED -1
78
#define REQ_SUBMITTED 0
79
#define REQ_COMPLETED 1
80

    
81
enum gluster_state {
82
        ACCEPTED = 0,
83
        PENDING = 1,
84
        READING = 2,
85
        WRITING = 3,
86
        STATING = 4,
87
        PREHASHING = 5,
88
        POSTHASHING = 6
89
};
90

    
91
struct glusterd {
92
        glfs_t *glfs;
93
        char uniquestr[MAX_UNIQUESTR_LEN + 1];
94
        int uniquestr_len;
95
        int async;
96
};
97

    
98
struct gluster_io{
99
        char obj_name[MAX_OBJ_NAME + 1];
100
        glfs_fd_t *fd;
101
        enum gluster_state state;
102
        uint64_t size;
103
        char *second_name, *buf;
104
        uint64_t read;
105
};
106

    
107
static inline struct glusterd *__get_gluster(struct peerd *peer)
108
{
109
        return peer->priv;
110
}
111

    
112
static inline int __set_glfs(struct glusterd *gluster, char *volume)
113
{
114
        gluster->glfs = glfs_new(volume);
115

    
116
        if (!gluster->glfs)
117
                return -1;
118

    
119
        return 0;
120
}
121

    
122
static inline glfs_t *__get_glfs(struct glusterd *gluster)
123
{
124
        return gluster->glfs;
125
}
126

    
127
int handle_read(struct peerd *peer, struct peer_req *pr);
128
int handle_write(struct peerd *peer, struct peer_req *pr);
129

    
130

    
131
static void gluster_complete_read(struct glfs_fd *fd, ssize_t ret, void *data)
132
{
133
        struct peer_req *pr = (struct peer_req*)data;
134
        struct peerd *peer = pr->peer;
135

    
136
        pr->retval = ret;
137
        dispatch(peer, pr, pr->req, dispatch_internal);
138
}
139

    
140
static void gluster_complete_write(struct glfs_fd *fd, ssize_t ret, void *data)
141
{
142
        struct peer_req *pr = (struct peer_req*)data;
143
        struct peerd *peer = pr->peer;
144

    
145
        pr->retval = ret;
146
        dispatch(peer, pr, pr->req, dispatch_internal);
147
}
148

    
149
static void submit_hook(struct peer_req *pr)
150
{
151
        struct peerd *peer = pr->peer;
152
        struct glusterd *gluster = __get_gluster(peer);
153

    
154
        if (!gluster->async)
155
                dispatch(peer, pr, pr->req, dispatch_internal);
156
}
157

    
158
static void create_hash_name(struct peer_req *pr, char *hash_name)
159
{
160
        struct gluster_io *gio = (struct gluster_io *) pr->priv;
161
        int pos = 0;
162

    
163
        strncpy(hash_name, gio->obj_name, strlen(gio->obj_name));
164
        pos += strlen(gio->obj_name);
165
        strncpy(hash_name + pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
166
        pos += HASH_SUFFIX_LEN;
167
        hash_name[pos] = 0;
168
}
169

    
170
static int allocate_gio_secname(struct peer_req *pr)
171
{
172
        struct gluster_io *gio = (struct gluster_io *) pr->priv;
173

    
174
        gio->second_name = malloc(MAX_OBJ_NAME + 1);
175
        if (!gio->second_name)
176
                return -1;
177

    
178
        return 0;
179
}
180

    
181
static int allocate_gio_buf(struct peer_req *pr)
182
{
183
        struct gluster_io *gio = (struct gluster_io *) pr->priv;
184
        struct xseg_request *req = pr->req;
185

    
186
        gio->buf = malloc(req->size);
187
        if (!gio->buf)
188
                return -1;
189

    
190
        return 0;
191
}
192

    
193
static int prepare_copy(struct peer_req *pr)
194
{
195
        struct peerd *peer = pr->peer;
196
        struct gluster_io *gio = (struct gluster_io *) pr->priv;
197
        char *data = xseg_get_data(peer->xseg, pr->req);
198
        struct xseg_request_copy *xcopy = (struct xseg_request_copy *)data;
199
        unsigned int end = (xcopy->targetlen > MAX_OBJ_NAME) ?
200
                MAX_OBJ_NAME : xcopy->targetlen;
201
        int r;
202

    
203
        r = allocate_gio_secname(pr);
204
        if (r < 0)
205
                return r;
206

    
207
        /* FIXME: terminate or fail if targetlen > MAX_OBJ_NAME ? */
208
        strncpy(gio->second_name, xcopy->target, end);
209
        gio->second_name[end] = 0;
210
        gio->read = 0;
211

    
212
        r = allocate_gio_buf(pr);
213
        return r;
214
}
215

    
216
static int prepare_hash(struct peer_req *pr, char *hash_name)
217
{
218
        int r;
219

    
220
        r = allocate_gio_secname(pr);
221
        if (r < 0)
222
                return r;
223

    
224
        create_hash_name(pr, hash_name);
225

    
226
        r = allocate_gio_buf(pr);
227
        return r;
228
}
229

    
230
static int write_lock_owner(struct peer_req *pr, glfs_fd_t *fd)
231
{
232
        struct peerd *peer = pr->peer;
233
        struct glusterd *gluster = __get_gluster(peer);
234
        char *me = gluster->uniquestr;
235
        int len = strlen(me);
236
        int serviced = 0;
237
        int r = 0;
238

    
239
        while (len > serviced) {
240
                r = glfs_pwrite(fd, me + serviced, len - serviced,
241
                                0 + serviced, 0);
242
                if (r > 0)
243
                        serviced += r;
244
                else if (errno != EINTR)
245
                        return -1;
246
        }
247

    
248
        return 0;
249
}
250

    
251
static int read_lock_owner(struct peer_req *pr, glfs_fd_t *fd, char *owner)
252
{
253
        struct peerd *peer = pr->peer;
254
        struct glusterd *gluster = __get_gluster(peer);
255
        char *buf = gluster->uniquestr;
256
        int len = strlen(buf);
257
        int serviced = 0;
258
        int r = 0;
259

    
260
        while (len > serviced) {
261
                r = glfs_pread(fd, owner + serviced, len - serviced,
262
                                0 + serviced, 0);
263
                if (r > 0)
264
                        serviced += r;
265
                else if (r == 0)
266
                        break;
267
                else if (errno != EINTR)
268
                        return -1;
269
        }
270

    
271
        return serviced;
272
}
273

    
274
static int validate_lock_owner(struct peer_req *pr, glfs_fd_t *fd)
275
{
276
        struct peerd *peer = pr->peer;
277
        struct glusterd *gluster = __get_gluster(peer);
278
        char lock_owner[MAX_UNIQUESTR_LEN + 1];
279
        char *me = gluster->uniquestr;
280
        int len = strlen(me);
281
        int r = 0;
282

    
283
        r = read_lock_owner(pr, fd, lock_owner);
284
        if (r < 0) {
285
                XSEGLOG2(&lc, E, "Failed to read lock owner of fd %p", fd);
286
                return -1;
287
        }
288

    
289
        if (r != len || strncmp(me, lock_owner, r) != 0) {
290
                XSEGLOG2(&lc, D, "Lock owner is %s while we are %s",
291
                                lock_owner, me);
292
                return -1;
293
        }
294

    
295
        return 0;
296
}
297

    
298
static int do_aio_generic(struct peer_req *pr, uint32_t op,
299
                char *buf, uint64_t size, uint64_t offset)
300
{
301
        struct peerd *peer = pr->peer;
302
        struct glusterd *gluster = __get_gluster(peer);
303
        struct gluster_io *gio = (struct gluster_io *) pr->priv;
304
        glfs_fd_t *fd = gio->fd;
305
        int r;
306

    
307
        switch (op) {
308
        case X_READ:
309
                if (gluster->async) {
310
                        r = glfs_pread_async(fd, buf, size, offset, 0,
311
                                        gluster_complete_read, pr);
312
                } else {
313
                        r = glfs_pread(fd, buf, size, offset, 0);
314
                        if (r >= 0)
315
                                pr->retval = r;
316
                }
317
                break;
318
        case X_WRITE:
319
                if (gluster->async) {
320
                        r = glfs_pwrite_async(fd, buf, size, offset, 0,
321
                                        gluster_complete_write, pr);
322
                } else {
323
                        r = glfs_pwrite(fd, buf, size, offset, 0);
324
                        if (r >= 0)
325
                                pr->retval = r;
326
                }
327
                break;
328
        default:
329
                return -1;
330
                break;
331
        }
332
        return r;
333
}
334

    
335
static int begin_aio_read(struct peer_req *pr, char *buf,
336
                uint64_t size, uint64_t offset)
337
{
338
        int r = 0;
339

    
340
        r = do_aio_generic(pr, X_READ, buf, size, offset);
341
        if (r >= 0)
342
                return REQ_SUBMITTED;
343
        else
344
                return REQ_FAILED;
345
}
346

    
347
static int begin_aio_write(struct peer_req *pr, char *buf,
348
                uint64_t size, uint64_t offset)
349
{
350
        int r = 0;
351

    
352
        r = do_aio_generic(pr, X_WRITE, buf, size, offset);
353
        if (r >= 0)
354
                return REQ_SUBMITTED;
355
        else
356
                return REQ_FAILED;
357
}
358

    
359
static int complete_aio_read(struct peer_req *pr, char *buf,
360
                uint64_t size, uint64_t offset, uint64_t *serviced)
361
{
362
        int r = 0;
363

    
364
        /* Leave on fail or if there are no other data */
365
        if (pr->retval < 0) {
366
                /* TODO: check errors */
367
                return REQ_FAILED;
368
        } else if (pr->retval == 0) {
369
                XSEGLOG2(&lc, I, "Zeroing rest of data");
370
                memset(buf + *serviced, 0, size - *serviced);
371
                *serviced = size;
372
                return REQ_COMPLETED;
373
        }
374

    
375
        /*
376
         * Else, check if all data have been served and resubmit if necessary
377
         */
378
        *serviced += pr->retval;
379
        if (*serviced == size) {
380
                return REQ_COMPLETED;
381
        } else {
382
                r = do_aio_generic(pr, X_READ, buf + *serviced,
383
                                size - *serviced, offset + *serviced);
384
                if (r >= 0)
385
                        return REQ_SUBMITTED;
386
                else
387
                        return REQ_FAILED;
388
        }
389
}
390

    
391
static int complete_aio_write(struct peer_req *pr, char *buf,
392
                uint64_t size, uint64_t offset, uint64_t *serviced)
393
{
394
        int r = 0;
395

    
396
        /* Leave on fail or if there are no other data */
397
        if (pr->retval < 0){
398
                /* TODO: check errors */
399
                return REQ_FAILED;
400
        }
401

    
402
        /*
403
         * Else, check if all data have been served and resubmit if necessary
404
         */
405
        *serviced += pr->retval;
406
        if (*serviced == size) {
407
                return REQ_COMPLETED;
408
        } else {
409
                r = do_aio_generic(pr, X_WRITE, buf + *serviced,
410
                                size - *serviced, offset + *serviced);
411
                if (r >= 0)
412
                        return REQ_SUBMITTED;
413
                else
414
                        return REQ_FAILED;
415
        }
416

    
417
        return r;
418
}
419

    
420
static glfs_fd_t *do_block_create(struct peer_req *pr, char *target, int mode)
421
{
422
        struct peerd *peer = pr->peer;
423
        struct glusterd *gluster = __get_gluster(peer);
424
        glfs_t *glfs = __get_glfs(gluster);
425
        glfs_fd_t *fd = NULL;
426

    
427
        /*
428
         * Create the requested file (in O_EXCL mode if requested)
429
         * If errno is EINTR, retry. If it is other that EEXIST or EINTR,
430
         * leave.
431
         */
432
        XSEGLOG2(&lc, D, "Creating target %s", target);
433
        do {
434
                errno = 0;
435
                fd = glfs_creat(glfs, target,
436
                        O_RDWR | O_CREAT | O_TRUNC | mode,
437
                        S_IRUSR | S_IWUSR);
438

    
439
                if (fd)
440
                        return fd;
441
                if (errno != EEXIST && errno != EINTR) {
442
                        XSEGLOG2(&lc, E, "Unexpected error (errno %d) while "
443
                                        "creating %s", errno, target);
444
                        return fd;
445
                }
446
        } while (errno == EINTR);
447

    
448
        return fd;
449
}
450

    
451
static glfs_fd_t *do_block_open(struct peer_req *pr, char *target, int mode)
452
{
453
        struct peerd *peer = pr->peer;
454
        struct glusterd *gluster = __get_gluster(peer);
455
        glfs_t *glfs = __get_glfs(gluster);
456
        glfs_fd_t *fd;
457

    
458
        /*
459
         * Open the requested file.
460
         * If errno is EINTR, retry. If it is other that ENOENT or EINTR,
461
         * leave.
462
         */
463
        XSEGLOG2(&lc, D, "Opening target %s", target);
464
        do {
465
                errno = 0;
466
                fd = glfs_open(glfs, target, O_RDWR);
467

    
468
                if (fd)
469
                        return fd;
470
                if (errno != ENOENT && errno != EINTR) {
471
                        XSEGLOG2(&lc, E, "Unexpected error (errno %d) while "
472
                                        "opening %s:", errno, target);
473
                        return fd;
474
                }
475
        } while (errno == EINTR);
476

    
477
        if (!fd && !(mode & O_CREAT))
478
                return NULL;
479

    
480
        /*
481
         * Create the requested file only if user has demanded so.
482
         * If errno is EINTR, retry. Else, leave.
483
         */
484
        fd = do_block_create(pr, target, 0);
485
        return fd;
486
}
487

    
488
static int do_block_close(struct peer_req *pr)
489
{
490
        struct gluster_io *gio = (struct gluster_io *)(pr->priv);
491
        glfs_fd_t *fd = gio->fd;
492
        int r;
493

    
494
        XSEGLOG2(&lc, D, "Closing fd %p", fd);
495
        if (!fd)
496
                return 0;
497

    
498
        r = glfs_close(fd);
499
        if (r < 0)
500
                XSEGLOG2(&lc, E, "Unexpected error (errno %d) while "
501
                                "closing fd %p:", errno, fd);
502

    
503
        return r;
504
}
505

    
506
/*
507
 * do_block_lock() does the following:
508
 * a. Checks if there is a lock-file for this object.
509
 *    i. Checks if the lock-file has been placed by us.
510
 *    ii. If we own the lock-file or if we are in XF_NOSYNC mode, it returns.
511
 *    iii. Else, it proceeds to (b)
512
 * b. Attempts to create its own lockfile (obj_name + lock suffix). If it
513
 *    succeeds or is in XF_NOSYNC mode, it returns.
514
 * c. If it has not locked the target object and is NOT in XF_NOSYNC mode, it
515
 *    sleeps for 1 second and it reattempts.
516
 */
517
int do_block_lock(struct peer_req *pr, char *target, int mode)
518
{
519
        glfs_fd_t *fd = NULL;
520
        int r;
521

    
522
        XSEGLOG2(&lc, D, "Locking target %s", target);
523
        do {
524
                fd = do_block_open(pr, target, 0);
525
                if (fd) {
526
                        r = validate_lock_owner(pr, fd);
527
                        if (r > 0 || mode == XF_NOSYNC)
528
                                break;
529
                }
530

    
531
                fd = do_block_create(pr, target, O_EXCL);
532
                if (fd) {
533
                        r = write_lock_owner(pr, fd);
534
                        break;
535
                } else if (mode == XF_NOSYNC) {
536
                        r = -1;
537
                        break;
538
                }
539

    
540
                sleep(1);
541
        } while (1);
542

    
543
        glfs_close(fd);
544

    
545
        return r;
546
}
547

    
548
/*
549
 * do_block_unlock() does the following:
550
 * a. Checks if there is a lock-file for this object. If not, it can safely
551
 *    return.
552
 * b. Checks if the lock-file has been placed by us.
553
 *    ii. If we own the lock-file or if we are in XF_FORCE mode, we unlink it.
554
 * c. If the unlinking was successful or if it failed due to ENOENT, the
555
 *    unlock operation is considered successful.
556
 */
557
int do_block_unlock(struct peer_req *pr, char *target, int mode)
558
{
559
        struct peerd *peer = pr->peer;
560
        struct glusterd *gluster = __get_gluster(peer);
561
        glfs_t *glfs = __get_glfs(gluster);
562
        glfs_fd_t *fd = NULL;
563
        int r = 0;
564

    
565
        XSEGLOG2(&lc, D, "Unlocking target %s", target);
566
        fd = do_block_open(pr, target, 0);
567
        if (!fd) {
568
                XSEGLOG2(&lc, D, "Target %s was not locked", target);
569
                return -1;
570
        }
571

    
572
        r = validate_lock_owner(pr, fd);
573
        if (r < 0 && !(mode & XF_FORCE))
574
                return -1;
575

    
576
        r = glfs_unlink(glfs, target);
577
        if (r >= 0 || errno == ENOENT)
578
                return 0;
579
        else
580
                return -1;
581
}
582

    
583
int do_block_stat(struct peer_req *pr, char *target, struct stat *buf)
584
{
585
        struct peerd *peer = pr->peer;
586
        struct glusterd *gluster = __get_gluster(peer);
587
        glfs_t *glfs = __get_glfs(gluster);
588

    
589
        XSEGLOG2(&lc, D, "Stating target %s", target);
590
        return glfs_stat(glfs, target, buf);
591
}
592

    
593
int do_block_delete(struct peer_req *pr, char *target)
594
{
595
        struct peerd *peer = pr->peer;
596
        struct glusterd *gluster = __get_gluster(peer);
597
        glfs_t *glfs = __get_glfs(gluster);
598

    
599
        XSEGLOG2(&lc, D, "Deleting target %s", target);
600
        return glfs_unlink(glfs, target);
601
}
602

    
603
int handle_delete(struct peerd *peer, struct peer_req *pr)
604
{
605
        struct gluster_io *gio = (struct gluster_io *) pr->priv;
606
        struct xseg_request *req = pr->req;
607
        int r;
608

    
609
        XSEGLOG2(&lc, D, "Started for pr %p, req %p, target %s",
610
                        pr, req, gio->obj_name);
611

    
612
        r = do_block_delete(pr, gio->obj_name);
613
        if (r < 0) {
614
                XSEGLOG2(&lc, E, "Deletion of %s failed", gio->obj_name);
615
                fail(peer, pr);
616
        } else {
617
                XSEGLOG2(&lc, I, "Deletion of %s completed", gio->obj_name);
618
                complete(peer, pr);
619
        }
620
        return 0;
621
}
622

    
623
int handle_info(struct peerd *peer, struct peer_req *pr)
624
{
625
        struct gluster_io *gio = (struct gluster_io *) pr->priv;
626
        struct xseg_request *req = pr->req;
627
        struct xseg_reply_info *xinfo;
628
        struct stat stat;
629
        char *req_data;
630
        char buf[XSEG_MAX_TARGETLEN + 1];
631
        char *target = xseg_get_target(peer->xseg, req);
632
        int r;
633

    
634
        XSEGLOG2(&lc, D, "Started for pr %p, req %p, target %s",
635
                        pr, req, gio->obj_name);
636

    
637
        if (req->datalen < sizeof(struct xseg_reply_info)) {
638
                /* FIXME: Is this normal? */
639
                strncpy(buf, target, req->targetlen);
640
                r = xseg_resize_request(peer->xseg, req, req->targetlen,
641
                                sizeof(struct xseg_reply_info));
642
                if (r < 0) {
643
                        XSEGLOG2(&lc, E, "Cannot resize request");
644
                        fail(peer, pr);
645
                        return -1;
646
                }
647
                target = xseg_get_target(peer->xseg, req);
648
                strncpy(target, buf, req->targetlen);
649
        }
650

    
651
        r = do_block_stat(pr, gio->obj_name, &stat);
652
        if (r < 0) {
653
                XSEGLOG2(&lc, E, "Stat failed for %s", gio->obj_name);
654
                fail(peer, pr);
655
                return -1;
656
        }
657

    
658
        req_data = xseg_get_data(peer->xseg, pr->req);
659
        xinfo = (struct xseg_reply_info *)req_data;
660
        xinfo->size = (uint64_t)stat.st_size;
661

    
662
        XSEGLOG2(&lc, I, "Getting info of %s completed", gio->obj_name);
663
        complete(peer, pr);
664
        return 0;
665
}
666

    
667
void handle_ping(struct peerd *peer, struct peer_req *pr)
668
{
669
        XSEGLOG2(&lc, D, "Ping accepted. Acknowledging...");
670

    
671
        complete(peer, pr);
672
}
673

    
674
int handle_read(struct peerd *peer, struct peer_req *pr)
675
{
676
        struct gluster_io *gio = (struct gluster_io *) (pr->priv);
677
        struct xseg_request *req = pr->req;
678
        glfs_fd_t *fd;
679
        char *data = xseg_get_data(peer->xseg, pr->req);
680
        char *target = gio->obj_name;
681
        int ret = REQ_UNDEFINED;
682

    
683
        XSEGLOG2(&lc, D, "Started for pr %p, req %p, target %s",
684
                        pr, req, target);
685

    
686
        if (gio->state == ACCEPTED) {
687
                if (req->datalen < req->size) {
688
                        XSEGLOG2(&lc, E, "Request datalen is less than "
689
                                        "request size");
690
                        ret = REQ_FAILED;
691
                        goto out;
692
                }
693

    
694
                if (!req->size) {
695
                        ret = REQ_COMPLETED;
696
                        goto out;
697
                }
698

    
699
                fd = do_block_open(pr, target, 0);
700
                if (!fd) {
701
                        XSEGLOG2(&lc, I, "Object %s does not exist. "
702
                                        "Serving zero data\n", target);
703
                        /* object not found. return zeros instead */
704
                        memset(data, 0, req->size);
705
                        req->serviced = req->size;
706
                        ret = REQ_COMPLETED;
707
                        goto out;
708
                }
709
                gio->fd = fd;
710

    
711
                XSEGLOG2(&lc, I, "Reading %s", target);
712

    
713
                gio->state = READING;
714
                ret = begin_aio_read(pr, data, req->size, req->offset);
715
        } else if (gio->state == READING) {
716
                XSEGLOG2(&lc, I, "Reading of %s callback", target);
717
                ret = complete_aio_read(pr, data, req->size,
718
                                req->offset, &req->serviced);
719
        }
720

    
721
out:
722
        switch (ret) {
723
        case REQ_FAILED:
724
                XSEGLOG2(&lc, E, "Reading of %s failed", target);
725
                do_block_close(pr);
726
                fail(peer, pr);
727
                break;
728
        case REQ_SUBMITTED:
729
                XSEGLOG2(&lc, I, "Reading of %s submitted", target);
730
                submit_hook(pr);
731
                break;
732
        case REQ_COMPLETED:
733
                XSEGLOG2(&lc, I, "Reading of %s completed", target);
734
                do_block_close(pr);
735
                complete(peer, pr);
736
                break;
737
        default:
738
                XSEGLOG2(&lc, E, "Unknown request state. Failing.");
739
                do_block_close(pr);
740
                fail(peer, pr);
741
                break;
742
        }
743
        return 0;
744
}
745

    
746
int handle_write(struct peerd *peer, struct peer_req *pr)
747
{
748
        struct gluster_io *gio = (struct gluster_io *) (pr->priv);
749
        struct xseg_request *req = pr->req;
750
        glfs_fd_t *fd;
751
        char *data = xseg_get_data(peer->xseg, pr->req);
752
        char *target = gio->obj_name;
753
        int ret = REQ_UNDEFINED;
754

    
755
        XSEGLOG2(&lc, D, "Started for pr %p, req %p, target %s",
756
                        pr, req, target);
757

    
758
        if (gio->state == ACCEPTED) {
759
                if (req->datalen < req->size) {
760
                        XSEGLOG2(&lc, E, "Request datalen is less than "
761
                                        "request size");
762
                        ret = REQ_FAILED;
763
                        goto out;
764
                }
765

    
766
                if (!req->size) {
767
                        /* TODO: Flush data if req->flags & XF_FLUSH */
768
                        ret = REQ_COMPLETED;
769
                        goto out;
770
                }
771

    
772
                fd = do_block_open(pr, target, O_CREAT);
773
                if (!fd) {
774
                        XSEGLOG2(&lc, E, "Cannot open/create %s", target);
775
                        ret = REQ_FAILED;
776
                        goto out;
777
                }
778
                gio->fd = fd;
779

    
780
                XSEGLOG2(&lc, I, "Writing %s", target);
781

    
782
                gio->state = WRITING;
783
                ret = begin_aio_write(pr, data, req->size, req->offset);
784
        } else if (gio->state == WRITING) {
785
                XSEGLOG2(&lc, I, "Writing of %s callback", target);
786
                ret = complete_aio_write(pr, data, req->size,
787
                                req->offset, &req->serviced);
788
        }
789

    
790
out:
791
        switch (ret) {
792
        case REQ_FAILED:
793
                XSEGLOG2(&lc, E, "Writing of %s failed", target);
794
                do_block_close(pr);
795
                fail(peer, pr);
796
                break;
797
        case REQ_SUBMITTED:
798
                XSEGLOG2(&lc, I, "Writing of %s submitted", target);
799
                submit_hook(pr);
800
                break;
801
        case REQ_COMPLETED:
802
                XSEGLOG2(&lc, I, "Writing of %s completed", target);
803
                do_block_close(pr);
804
                complete(peer, pr);
805
                break;
806
        default:
807
                XSEGLOG2(&lc, E, "Unknown request state. Failing.");
808
                do_block_close(pr);
809
                fail(peer, pr);
810
                break;
811
        }
812
        return 0;
813
}
814

    
815
int handle_copy(struct peerd *peer, struct peer_req *pr)
816
{
817
        struct gluster_io *gio = (struct gluster_io *) (pr->priv);
818
        struct xseg_request *req = pr->req;
819
        glfs_fd_t *fd;
820
        char *target = gio->obj_name;
821
        int r;
822
        int ret = REQ_UNDEFINED;
823

    
824
        XSEGLOG2(&lc, D, "Started for pr %p, req %p, target %s",
825
                        pr, req, target);
826

    
827
        if (gio->state == ACCEPTED) {
828
                /* Create second name and buf */
829
                r = prepare_copy(pr);
830
                if (r < 0) {
831
                        ret = REQ_FAILED;
832
                        goto out;
833
                }
834
                target = gio->second_name;
835

    
836
                XSEGLOG2(&lc, I, "Copy of object %s to object %s started",
837
                                gio->second_name, gio->obj_name);
838

    
839
                if (!req->size) {
840
                        ret = REQ_COMPLETED;
841
                        goto out;
842
                }
843

    
844
                /* FIXME: Will we fail here? */
845
                fd = do_block_open(pr, target, 0);
846
                if (!fd) {
847
                        XSEGLOG2(&lc, I, "Object %s does not exist. "
848
                                        "Serving zero data\n", target);
849
                        /* object not found. return zeros instead */
850
                        memset(gio->buf, 0, req->size);
851
                        goto write;
852
                }
853
                gio->fd = fd;
854

    
855
                XSEGLOG2(&lc, I, "Reading %s", target);
856

    
857
                gio->state = READING;
858
                gio->read = 0;
859
                ret = begin_aio_read(pr, gio->buf, req->size, req->offset);
860
        }
861
        else if (gio->state == READING){
862
                target = gio->second_name;
863
                XSEGLOG2(&lc, I, "Reading of %s callback", target);
864

    
865
                ret = complete_aio_read(pr, gio->buf, req->size,
866
                                req->offset, &gio->read);
867

    
868
                if (ret == REQ_FAILED || ret == REQ_SUBMITTED)
869
                        goto out;
870

    
871
                do_block_close(pr);
872
write:
873
                target = gio->obj_name;
874
                XSEGLOG2(&lc, I, "Target is %s", target);
875
                fd = do_block_open(pr, target, O_CREAT);
876
                if (!fd) {
877
                        XSEGLOG2(&lc, E, "Cannot open/create %s", target);
878
                        ret = REQ_FAILED;
879
                        goto out;
880
                }
881
                gio->fd = fd;
882

    
883
                XSEGLOG2(&lc, I, "Writing %s", target);
884

    
885
                gio->state = WRITING;
886
                ret = begin_aio_write(pr, gio->buf, req->size, req->offset);
887
        } else if (gio->state == WRITING) {
888
                XSEGLOG2(&lc, I, "Writing of %s callback", target);
889
                ret = complete_aio_write(pr, gio->buf, req->size,
890
                                req->offset, &req->serviced);
891
        }
892

    
893
out:
894
        if (ret != REQ_SUBMITTED) {
895
                free(gio->buf);
896
                free(gio->second_name);
897
                gio->buf = NULL;
898
                gio->second_name = NULL;
899
                gio->read = 0;
900
        }
901

    
902
        switch (ret) {
903
        case REQ_FAILED:
904
                XSEGLOG2(&lc, E, "Copying of %s failed", target);
905
                do_block_close(pr);
906
                fail(peer, pr);
907
                break;
908
        case REQ_SUBMITTED:
909
                submit_hook(pr);
910
                break;
911
        case REQ_COMPLETED:
912
                XSEGLOG2(&lc, I, "Copying of %s completed", target);
913
                do_block_close(pr);
914
                complete(peer, pr);
915
                break;
916
        default:
917
                XSEGLOG2(&lc, E, "Unknown request state. Failing.");
918
                do_block_close(pr);
919
                fail(peer, pr);
920
                break;
921
        }
922
        return 0;
923
}
924

    
925
int handle_hash(struct peerd *peer, struct peer_req *pr)
926
{
927
        struct xseg_request *req = pr->req;
928
        struct gluster_io *gio = (struct gluster_io *) pr->priv;
929
        struct xseg_reply_hash *xreply;
930
        glfs_fd_t *fd;
931
        uint64_t trailing_zeros = 0;
932
        unsigned char sha[SHA256_DIGEST_SIZE];
933
        char hash_name[HEXLIFIED_SHA256_DIGEST_SIZE + 1];
934
        uint32_t pos;
935
        char *target = gio->obj_name;
936
        int r;
937
        int ret = REQ_UNDEFINED;
938

    
939
        XSEGLOG2(&lc, D, "Started for pr %p, req %p, target %s",
940
                        pr, req, target);
941

    
942
        if (gio->state == ACCEPTED){
943
                XSEGLOG2(&lc, I, "Starting hashing of object %s", target);
944

    
945
                if (!req->size) {
946
                        ret = REQ_COMPLETED; /* or fail? */
947
                        goto out;
948
                }
949

    
950
                /* Create hash_name, gio second_name and gio->buf */
951
                r = prepare_hash(pr, hash_name);
952
                if (r < 0) {
953
                        ret = REQ_FAILED;
954
                        goto out;
955
                }
956
                target = hash_name;
957
                gio->state = PREHASHING;
958

    
959
                /* Get correct status */
960
                fd = do_block_open(pr, target, 0);
961
                if (!fd) {
962
                        XSEGLOG2(&lc, I, "Hash %s does not exist.", target);
963
                        goto read;
964
                }
965
                gio->fd = fd;
966

    
967
                XSEGLOG2(&lc, I, "Reading %s", target);
968
                /* Read contents of hash_name in the gio->second_name buffer */
969
                ret = begin_aio_read(pr, gio->second_name,
970
                                HEXLIFIED_SHA256_DIGEST_SIZE, req->offset);
971
        } else if (gio->state == PREHASHING) {
972
                target = hash_name;
973
                XSEGLOG2(&lc, I, "Reading of %s callback", target);
974
                ret = complete_aio_read(pr, gio->second_name,
975
                                HEXLIFIED_SHA256_DIGEST_SIZE,
976
                                req->offset, &gio->read);
977

    
978
                if (ret == REQ_FAILED || ret == REQ_SUBMITTED)
979
                        goto out;
980

    
981
                /* Construct answer */
982
                XSEGLOG2(&lc, D, "Precalculated hash found");
983
                xreply = (struct xseg_reply_hash*)xseg_get_data(peer->xseg, req);
984
                r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
985
                                sizeof(struct xseg_reply_hash));
986
                strncpy(xreply->target, gio->second_name, HEXLIFIED_SHA256_DIGEST_SIZE);
987
                xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE;
988

    
989
                XSEGLOG2(&lc, I, "Calculated %s as hash of %s",
990
                                gio->second_name, gio->obj_name);
991
                req->serviced = req->size;
992
                goto out;
993
                /* Leave */
994

    
995
read:
996
                /*
997
                 * We reach this point only if there was no precalculated hash
998
                 */
999
                gio->state = READING;
1000
                gio->read = 0;
1001
                target = gio->obj_name;
1002

    
1003
                fd = do_block_open(pr, target, 0);
1004
                if (!fd) {
1005
                        XSEGLOG2(&lc, I, "Original object %s does not "
1006
                                        "exist.\nServing zeroes.", target);
1007
                        memset(gio->buf, 0, req->size);
1008
                        gio->read = req->size;
1009
                        goto hash;
1010
                }
1011
                gio->fd = fd;
1012

    
1013
                XSEGLOG2(&lc, I, "Reading %s", target);
1014
                ret = begin_aio_read(pr, gio->buf, req->size, req->offset);
1015
        } else if (gio->state == READING) {
1016
                target = gio->obj_name;
1017
                XSEGLOG2(&lc, I, "Reading of %s callback", target);
1018
                ret = complete_aio_read(pr, gio->buf, req->size,
1019
                                req->offset, &gio->read);
1020

    
1021
                if (ret == REQ_FAILED || ret == REQ_SUBMITTED)
1022
                        goto out;
1023

    
1024
                do_block_close(pr);
1025

    
1026
hash:
1027
                /* Strip here trailing zeroes */
1028
                for (; trailing_zeros < gio->read; trailing_zeros++) {
1029
                        if (gio->buf[gio->read-trailing_zeros -1])
1030
                                break;
1031
                }
1032
                XSEGLOG2(&lc, D, "Read %llu, Trailing zeros %llu",
1033
                                gio->read, trailing_zeros);
1034

    
1035
                gio->read -= trailing_zeros;
1036
                SHA256((unsigned char *) gio->buf, gio->read, sha);
1037
                hexlify(sha, SHA256_DIGEST_SIZE, gio->second_name);
1038
                gio->second_name[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
1039

    
1040
                /* Construct reply */
1041
                xreply = (struct xseg_reply_hash*)xseg_get_data(peer->xseg, req);
1042
                r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
1043
                                sizeof(struct xseg_reply_hash));
1044
                strncpy(xreply->target, gio->second_name, HEXLIFIED_SHA256_DIGEST_SIZE);
1045
                xreply->targetlen = HEXLIFIED_SHA256_DIGEST_SIZE;
1046

    
1047
                XSEGLOG2(&lc, I, "Calculated %s as hash of %s",
1048
                                gio->second_name, gio->obj_name);
1049
                /*
1050
                 * We can't leave since we need to write the
1051
                 * content-addressable object to the backend.
1052
                 */
1053

    
1054
                gio->read = 0;
1055
                fd = do_block_create(pr, gio->second_name, O_EXCL);
1056
                if (!fd) {
1057
                        XSEGLOG2(&lc, I, "Hash of object %s to object %s completed",
1058
                                        gio->obj_name, gio->second_name);
1059
                        req->serviced = req->size;
1060
                        goto write;
1061
                }
1062
                gio->fd = fd;
1063
                target = gio->second_name;
1064

    
1065
                XSEGLOG2(&lc, I, "Writing %s", target);
1066

    
1067
                gio->state = WRITING;
1068
                ret = begin_aio_write(pr, gio->buf, req->size, req->offset);
1069
        } else if (gio->state == WRITING) {
1070
                target = gio->second_name;
1071
                XSEGLOG2(&lc, I, "Writing of %s callback", target);
1072
                ret = complete_aio_write(pr, gio->buf, req->size, req->offset,
1073
                                &req->serviced);
1074

    
1075
                if (ret == REQ_FAILED || ret == REQ_SUBMITTED)
1076
                        goto out;
1077

    
1078
                XSEGLOG2(&lc, I, "Writing of %s completed", gio->second_name);
1079

    
1080
                /*
1081
                 * We can't leave since we need to write the precalculated hash
1082
                 * value to the backend.
1083
                 */
1084
                do_block_close(pr);
1085
write:
1086
                pos = 0;
1087
                strncpy(hash_name, gio->obj_name, strlen(gio->obj_name));
1088
                pos += strlen(gio->obj_name);
1089
                strncpy(hash_name+pos, HASH_SUFFIX, HASH_SUFFIX_LEN);
1090
                pos += HASH_SUFFIX_LEN;
1091
                hash_name[pos] = 0;
1092

    
1093
                gio->state = POSTHASHING;
1094
                fd = do_block_create(pr, hash_name, O_EXCL);
1095
                if (!fd) {
1096
                        XSEGLOG2(&lc, I, "Writing of prehashed value completed");
1097
                        XSEGLOG2(&lc, I, "Hash of object %s to object %s completed",
1098
                                        gio->obj_name, gio->second_name);
1099
                        req->serviced = req->size;
1100
                        ret = REQ_COMPLETED;
1101
                        /* TODO: Check errors */
1102
                        goto out;
1103
                }
1104
                gio->fd = fd;
1105
                target = hash_name;
1106

    
1107
                XSEGLOG2(&lc, I, "Writing prehashed value");
1108
                ret = begin_aio_write(pr, gio->second_name,
1109
                                HEXLIFIED_SHA256_DIGEST_SIZE, 0);
1110
        } else if (gio->state == POSTHASHING) {
1111
                XSEGLOG2(&lc, I, "Writing of prehashed value callback");
1112
                ret = complete_aio_write(pr, gio->second_name,
1113
                                HEXLIFIED_SHA256_DIGEST_SIZE, 0, &gio->read);
1114

    
1115
                if (ret == REQ_FAILED || ret == REQ_SUBMITTED)
1116
                        goto out;
1117

    
1118
                XSEGLOG2(&lc, I, "Writing of prehashed value completed");
1119
                req->serviced = req->size;
1120
        }
1121

    
1122
out:
1123
        target = gio->obj_name;
1124
        if (ret != REQ_SUBMITTED) {
1125
                free(gio->buf);
1126
                free(gio->second_name);
1127
                gio->buf = NULL;
1128
                gio->second_name = NULL;
1129
                gio->read = 0;
1130
        }
1131

    
1132
        switch (ret) {
1133
        case REQ_FAILED:
1134
                XSEGLOG2(&lc, E, "Hashing of %s failed", target);
1135
                do_block_close(pr);
1136
                fail(peer, pr);
1137
                break;
1138
        case REQ_SUBMITTED:
1139
                submit_hook(pr);
1140
                break;
1141
        case REQ_COMPLETED:
1142
                XSEGLOG2(&lc, I, "Hashing of %s completed", target);
1143
                do_block_close(pr);
1144
                complete(peer, pr);
1145
                break;
1146
        default:
1147
                XSEGLOG2(&lc, E, "Unknown request state. Failing.");
1148
                do_block_close(pr);
1149
                fail(peer, pr);
1150
                break;
1151
        }
1152
        return 0;
1153
}
1154

    
1155
int handle_acquire(struct peerd *peer, struct peer_req *pr)
1156
{
1157
        struct gluster_io *gio = (struct gluster_io *)(pr->priv);
1158
        struct xseg_request *req = pr->req;
1159
        uint32_t len = strlen(gio->obj_name);
1160
        int ret;
1161

    
1162
        strncpy(gio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
1163
        gio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
1164

    
1165
        XSEGLOG2(&lc, I, "Starting lock op for %s", gio->obj_name);
1166

    
1167
        /* TODO: Check error codes and retry if needed */
1168
        ret = do_block_lock(pr, gio->obj_name, req->flags & XF_NOSYNC);
1169
        if (ret < 0) {
1170
                XSEGLOG2(&lc, E, "Lock op failed for %s", gio->obj_name);
1171
                fail(peer, pr);
1172
        } else {
1173
                XSEGLOG2(&lc, I, "Lock op succeeded for %s", gio->obj_name);
1174
                complete(peer, pr);
1175
        }
1176
        return 0;
1177
}
1178

    
1179

    
1180
int handle_release(struct peerd *peer, struct peer_req *pr)
1181
{
1182
        struct gluster_io *gio = (struct gluster_io *)(pr->priv);
1183
        struct xseg_request *req = pr->req;
1184
        uint32_t len = strlen(gio->obj_name);
1185
        int ret;
1186

    
1187
        strncpy(gio->obj_name + len, LOCK_SUFFIX, LOCK_SUFFIX_LEN);
1188
        gio->obj_name[len + LOCK_SUFFIX_LEN] = 0;
1189

    
1190
        XSEGLOG2(&lc, I, "Starting unlock op for %s", gio->obj_name);
1191

    
1192
        /* TODO: Check error codes and retry if needed */
1193
        ret = do_block_unlock(pr, gio->obj_name, req->flags & XF_FORCE);
1194
        if (ret < 0) {
1195
                XSEGLOG2(&lc, E, "Unlock op failed for %s", gio->obj_name);
1196
                fail(peer, pr);
1197
        } else {
1198
                XSEGLOG2(&lc, I, "Unlock op succeeded for %s", gio->obj_name);
1199
                complete(peer, pr);
1200
        }
1201
        return 0;
1202
}
1203

    
1204
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1205
{
1206
        struct glusterd *gluster = malloc(sizeof(struct glusterd));
1207
        struct gluster_io *gio;
1208
        glfs_t *glfs = NULL;
1209
        char transport[MAX_GLFS_ARG_LEN + 1];
1210
        char server[MAX_GLFS_ARG_LEN + 1];
1211
        char volume[MAX_GLFS_ARG_LEN + 1];
1212
        char uniquestr[MAX_UNIQUESTR_LEN + 1];
1213
        int tid;
1214
        int port = 0;
1215
        int ret = 0;
1216
        int i = 0;
1217
        int j = 0;
1218

    
1219
        if (!gluster) {
1220
                perror("malloc");
1221
                return -1;
1222
        }
1223
        gluster->async = 0;
1224

    
1225
        uniquestr[0] = 0;
1226
        transport[0] = 0;
1227
        server[0] = 0;
1228
        volume[0] = 0;
1229

    
1230
        BEGIN_READ_ARGS(argc, argv);
1231
        READ_ARG_STRING("--transport", transport, MAX_GLFS_ARG_LEN);
1232
        READ_ARG_STRING("--server", server, MAX_GLFS_ARG_LEN);
1233
        READ_ARG_ULONG("--port", port);
1234
        READ_ARG_STRING("--volume", volume, MAX_GLFS_ARG_LEN);
1235
        READ_ARG_STRING("--uniquestr", gluster->uniquestr, MAX_UNIQUESTR_LEN);
1236
        READ_ARG_BOOL("--async", gluster->async);
1237
        END_READ_ARGS();
1238

    
1239

    
1240
        if (!volume[0]){
1241
                XSEGLOG2(&lc, E , "Volume must be provided");
1242
                usage(argv[0]);
1243
                goto err_arg;
1244
        }
1245

    
1246
        /* Use defaults if user has not provided his/her own */
1247
        if (!transport[0])
1248
                strncpy(transport, "tcp", 4);
1249

    
1250
        if (!server[0])
1251
                strncpy(server, "127.0.0.1", 10);
1252

    
1253
        if (!uniquestr[0]) {
1254
                tid = getpid();
1255
                snprintf(gluster->uniquestr, MAX_UNIQUESTR_LEN, "%d", tid);
1256
                XSEGLOG2(&lc, W, "Warning, uniquestr not provided.\n"
1257
                                "\tUsing instead \"%s\" but uniqueness is not "
1258
                                "guaranteed.", gluster->uniquestr);
1259
        } else {
1260
                strncpy(gluster->uniquestr, uniquestr, MAX_UNIQUESTR_LEN);
1261
        }
1262

    
1263
        ret = __set_glfs(gluster, volume);
1264
        if (ret < 0) {
1265
                XSEGLOG2(&lc, E, "Error at glfs_new");
1266
                goto err_glfs;
1267
        }
1268
        glfs = __get_glfs(gluster);
1269

    
1270
        ret = glfs_set_volfile_server(glfs, transport, server, port);
1271
        if (ret < 0) {
1272
                XSEGLOG2(&lc, E, "Error at glfs_set_volfile_server");
1273
                goto err_glfs;
1274
        }
1275

    
1276
        ret = glfs_init(glfs);
1277
        if (ret) {
1278
                XSEGLOG2(&lc, E, "Error at glfs_init\n");
1279
                goto err_glfs;
1280
        }
1281

    
1282
        peer->priv = (void *)gluster;
1283
        for (i = 0; i < peer->nr_ops; i++) {
1284
                gio = malloc(sizeof(struct gluster_io));
1285

    
1286
                if (!gio)
1287
                        goto err_gio;
1288

    
1289
                gio->fd = NULL;
1290
                gio->buf = 0;
1291
                gio->read = 0;
1292
                gio->size = 0;
1293
                gio->second_name = 0;
1294
                peer->peer_reqs[i].priv = (void *) gio;
1295
        }
1296
        return 0;
1297

    
1298
err_arg:
1299
        free(gluster);
1300
err_gio:
1301
        for (j = 0; j < i; j++)
1302
                free(peer->peer_reqs[j].priv);
1303
err_glfs:
1304
        glfs_fini(glfs);
1305
        return -1;
1306
}
1307

    
1308
// nothing to do here for now
1309
int custom_arg_parse(int argc, const char *argv[])
1310
{
1311
        return 0;
1312
}
1313

    
1314
void custom_peer_finalize(struct peerd *peer)
1315
{
1316
        struct glusterd *gluster = __get_gluster(peer);
1317
        glfs_t *glfs = __get_glfs(gluster);
1318

    
1319
        glfs_fini(glfs);
1320
        XSEGLOG2(&lc, I, "Glusterd has closed successfully");
1321
}
1322

    
1323
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1324
                enum dispatch_reason reason)
1325
{
1326
        struct gluster_io *gio = (struct gluster_io *) (pr->priv);
1327
        char *target = xseg_get_target(peer->xseg, pr->req);
1328
        unsigned int end = (pr->req->targetlen > MAX_OBJ_NAME) ?
1329
                MAX_OBJ_NAME : pr->req->targetlen;
1330

    
1331
        if (reason == dispatch_accept) {
1332
                strncpy(gio->obj_name, target, end);
1333
                gio->obj_name[end] = 0;
1334
                gio->state = ACCEPTED;
1335
                gio->fd = NULL;
1336
                gio->read = 0;
1337
        }
1338

    
1339
        switch (pr->req->op){
1340
                case X_READ:
1341
                        handle_read(peer, pr);
1342
                        break;
1343
                case X_WRITE:
1344
                        handle_write(peer, pr);
1345
                        break;
1346
                case X_DELETE:
1347
                        if (canDefer(peer))
1348
                                defer_request(peer, pr);
1349
                        else
1350
                                handle_delete(peer, pr);
1351
                        break;
1352
                case X_INFO:
1353
                        if (canDefer(peer))
1354
                                defer_request(peer, pr);
1355
                        else
1356
                                handle_info(peer, pr);
1357
                        break;
1358
                case X_COPY:
1359
                        if (canDefer(peer))
1360
                                defer_request(peer, pr);
1361
                        else
1362
                                handle_copy(peer, pr);
1363
                        break;
1364
                case X_ACQUIRE:
1365
                        handle_acquire(peer, pr);
1366
                        break;
1367
                case X_RELEASE:
1368
                        handle_release(peer, pr);
1369
                        break;
1370
                case X_HASH:
1371
                        handle_hash(peer, pr);
1372
                        break;
1373
                case X_PING:
1374
                        handle_ping(peer, pr);
1375
                        break;
1376
                default:
1377
                        fail(peer, pr);
1378
        }
1379
        return 0;
1380
}