Statistics
| Branch: | Revision:

root / xseg / peers / user / mt-vlmcd.c @ ae8c75f4

History | View | Annotate | Download (22.4 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#include <stdio.h>
36
#include <stdlib.h>
37
#include <unistd.h>
38
#include <xseg/xseg.h>
39
#include <xseg/protocol.h>
40
#include <peer.h>
41
#include <sched.h>
42
#include <sys/syscall.h>
43

    
44
enum io_state_enum {
45
        ACCEPTED = 0,
46
        MAPPING = 1,
47
        SERVING = 2,
48
        CONCLUDED = 3
49
};
50

    
51
#define VF_VOLUME_FREEZED (1 << 0)
52

    
53
struct volume_info{
54
        char name[XSEG_MAX_TARGETLEN + 1];
55
        uint32_t flags;
56
        uint32_t active_reqs;
57
        struct xq *pending_reqs;
58
        struct peer_req *pending_pr;
59
};
60

    
61
struct vlmcd {
62
        xport mportno;
63
        xport bportno;
64
        xhash_t *volumes; //hash [volumename] -> struct volume_info
65
};
66

    
67
struct vlmc_io {
68
        int err;
69
        struct xlock lock;
70
        volatile enum io_state_enum state;
71
        struct xseg_request *mreq;
72
        struct xseg_request **breqs;
73
        unsigned long breq_len, breq_cnt;
74
};
75

    
76
void custom_peer_usage()
77
{
78
        fprintf(stderr, "Custom peer options: \n"
79
                        "-mp : mapper port\n"
80
                        "-bp : blocker port for blocks\n"
81
                        "\n");
82
}
83

    
84
static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
85
{
86
        vio->state = state;
87
}
88

    
89
static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
90
{
91
        enum io_state_enum state;
92
        state = vio->state;
93
        return state;
94
}
95

    
96
static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
97
{
98
        return (struct vlmc_io *) pr->priv;
99
}
100

    
101
static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
102
{
103
        return (struct vlmcd *) peer->priv;
104
}
105

    
106
static struct xq * allocate_queue(xqindex nr)
107
{
108
        struct xq *q = malloc(sizeof(struct xq));
109
        if (!q)
110
                return NULL;
111
        if (!xq_alloc_empty(q, nr)){
112
                free(q);
113
                return NULL;
114
        }
115
        return q;
116
}
117

    
118
static int doubleup_queue(struct volume_info *vi)
119
{
120
        //assert vi->pending_reqs
121
        XSEGLOG2(&lc, D, "Doubling up queue of volume %s", vi->name);
122
        struct xq *newq = allocate_queue(vi->pending_reqs->size * 2);
123
        if (!newq){
124
                XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Allocation error",
125
                                vi->name);
126
                return -1;
127
        }
128

    
129
        if (__xq_resize(vi->pending_reqs, newq) == Noneidx){
130
                xq_free(newq);
131
                free(newq);
132
                XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Resize error",
133
                                vi->name);
134
                return -1;
135
        }
136
        xq_free(vi->pending_reqs);
137
        free(vi->pending_reqs);
138
        vi->pending_reqs = newq;
139
        XSEGLOG2(&lc, D, "Doubling up queue of volume %s completed", vi->name);
140
        return 0;
141
}
142

    
143
static struct volume_info * find_volume(struct vlmcd *vlmc, char *volume)
144
{
145
        struct volume_info *vi = NULL;
146
        XSEGLOG2(&lc, D, "looking up volume %s", volume);
147
        int r = xhash_lookup(vlmc->volumes, (xhashidx) volume,
148
                        (xhashidx *) &vi);
149
        if (r < 0){
150
                XSEGLOG2(&lc, D, "looking up volume %s failed", volume);
151
                return NULL;
152
        }
153
        XSEGLOG2(&lc, D, "looking up volume %s completed. VI: %lx",
154
                        volume, (unsigned long)vi);
155
        return vi;
156
}
157

    
158
static struct volume_info * find_volume_len(struct vlmcd *vlmc, char *target,
159
                                                uint32_t targetlen)
160
{
161
        char buf[XSEG_MAX_TARGETLEN+1];
162
        strncpy(buf, target, targetlen);
163
        buf[targetlen] = 0;
164
        XSEGLOG2(&lc, D, "looking up volume %s, len %u",
165
                        buf, targetlen);
166
        return find_volume(vlmc, buf);
167

    
168
}
169

    
170
static int insert_volume(struct vlmcd *vlmc, struct volume_info *vi)
171
{
172
        int r = -1;
173

    
174
        if (find_volume(vlmc, vi->name)){
175
                XSEGLOG2(&lc, W, "Volume %s found in hash", vi->name);
176
                return r;
177
        }
178

    
179
        XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx)", 
180
                        vi->name, strlen(vi->name), (unsigned long) vi);
181
        r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
182
        while (r == -XHASH_ERESIZE) {
183
                xhashidx shift = xhash_grow_size_shift(vlmc->volumes);
184
                xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, 0, NULL);
185
                if (!new_hashmap){
186
                        XSEGLOG2(&lc, E, "Cannot grow vlmc->volumes to sizeshift %llu",
187
                                        (unsigned long long) shift);
188
                        return r;
189
                }
190
                vlmc->volumes = new_hashmap;
191
                r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
192
        }
193
        XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx) completed", 
194
                        vi->name, strlen(vi->name), (unsigned long) vi);
195

    
196
        return r;
197

    
198
}
199

    
200
static int remove_volume(struct vlmcd *vlmc, struct volume_info *vi)
201
{
202
        int r = -1;
203

    
204
        XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx)", 
205
                        vi->name, strlen(vi->name), (unsigned long) vi);
206
        r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
207
        while (r == -XHASH_ERESIZE) {
208
                xhashidx shift = xhash_shrink_size_shift(vlmc->volumes);
209
                xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, 0, NULL);
210
                if (!new_hashmap){
211
                        XSEGLOG2(&lc, E, "Cannot shrink vlmc->volumes to sizeshift %llu",
212
                                        (unsigned long long) shift);
213
                        XSEGLOG2(&lc, E, "Removing volume %s, (volume_info: %lx) failed", 
214
                                        vi->name, (unsigned long) vi);
215
                        return r;
216
                }
217
                vlmc->volumes = new_hashmap;
218
                r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
219
        }
220
        if (r < 0)
221
                XSEGLOG2(&lc, W, "Removing volume %s, len: %d (volume_info: %lx) failed", 
222
                        vi->name, strlen(vi->name), (unsigned long) vi);
223
        else
224
                XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx) completed", 
225
                        vi->name, strlen(vi->name), (unsigned long) vi);
226
        return r;
227
}
228

    
229
static int do_accepted_pr(struct peerd *peer, struct peer_req *pr);
230

    
231
static int conclude_pr(struct peerd *peer, struct peer_req *pr)
232
{
233
        struct vlmcd *vlmc = __get_vlmcd(peer);
234
        struct vlmc_io *vio = __get_vlmcio(pr);
235
        char *target = xseg_get_target(peer->xseg, pr->req);
236
        struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
237

    
238
        XSEGLOG2(&lc, D, "Concluding pr %lx, req: %lx vi: %lx", pr, pr->req, vi);
239

    
240
        __set_vio_state(vio, CONCLUDED);
241
        if (vio->err)
242
                fail(peer, pr);
243
        else
244
                complete(peer, pr);
245

    
246
        if (vi){
247
                //assert vi->active_reqs > 0
248
                uint32_t ar = --vi->active_reqs;
249
                XSEGLOG2(&lc, D, "vi: %lx, volume name: %s, active_reqs: %lu, pending_pr: %lx",
250
                                vi, vi->name, ar, vi->pending_pr);
251
                if (!ar && vi->pending_pr)
252
                        do_accepted_pr(peer, vi->pending_pr);
253
        }
254
        XSEGLOG2(&lc, D, "Concluded pr %lx, vi: %lx", pr, vi);
255
        return 0;
256
}
257

    
258
static int should_freeze_volume(struct xseg_request *req)
259
{
260
        if (req->op == X_CLOSE || req->op == X_SNAPSHOT ||
261
                (req->op == X_WRITE && !req->size && (req->flags & XF_FLUSH)))
262
                return 1;
263
        return 0;
264
}
265

    
266
static int do_accepted_pr(struct peerd *peer, struct peer_req *pr)
267
{
268
        struct vlmcd *vlmc = __get_vlmcd(peer);
269
        struct vlmc_io *vio = __get_vlmcio(pr);
270
        int r;
271
        xport p;
272
        char *target, *mtarget;
273
        void *dummy;
274

    
275
        struct volume_info *vi;
276

    
277
        XSEGLOG2(&lc, I, "Do accepted pr started for pr %lx", pr);
278
        target = xseg_get_target(peer->xseg, pr->req);
279
        if (!target){
280
                vio->err = 1;
281
                conclude_pr(peer, pr);
282
                return -1;
283
        }
284

    
285
        vi = find_volume_len(vlmc, target, pr->req->targetlen);
286
        if (!vi){
287
                XSEGLOG2(&lc, E, "Cannot find volume");
288
                XSEGLOG2(&lc, E, "Pr %lx", pr);
289
                vio->err = 1;
290
                conclude_pr(peer, pr);
291
                return -1;
292
        }
293

    
294
        if (should_freeze_volume(pr->req)){
295
                XSEGLOG2(&lc, I, "Freezing volume %s", vi->name);
296
                vi->flags |= VF_VOLUME_FREEZED;
297
                if (vi->active_reqs){
298
                        //assert vi->pending_pr == NULL;
299
                        XSEGLOG2(&lc, I, "Active reqs of %s: %lu. Pending pr is set to %lx",
300
                                        vi->name, vi->active_reqs, pr);
301
                        vi->pending_pr = pr;
302
                        return 0;
303
                }
304
                else {
305
                        XSEGLOG2(&lc, I, "No active reqs of %s. Pending pr is set to NULL",
306
                                        vi->name);
307
                        //assert vi->pending_pr == pr
308
                        vi->pending_pr = NULL;
309
                }
310

    
311
        }
312

    
313
        vi->active_reqs++;
314

    
315
        vio->err = 0; //reset error state
316

    
317
        if (pr->req->op == X_WRITE && !pr->req->size &&
318
                        (pr->req->flags & (XF_FLUSH|XF_FUA))){
319
                //hanlde flush requests here, so we don't mess with mapper
320
                //because of the -1 offset
321
                vi->flags &= ~VF_VOLUME_FREEZED;
322
                XSEGLOG2(&lc, I, "Completing flush request");
323
                pr->req->serviced = pr->req->size;
324
                conclude_pr(peer, pr);
325
                return 0;
326
        }
327

    
328
        vio->mreq = xseg_get_request(peer->xseg, pr->portno,
329
                                        vlmc->mportno, X_ALLOC);
330
        if (!vio->mreq)
331
                goto out_err;
332

    
333
        /* use datalen 0. let mapper allocate buffer space as needed */
334
        r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0);
335
        if (r < 0) {
336
                XSEGLOG2(&lc, E, "Cannot prep request %lx, of pr %lx for volume %s",
337
                                vio->mreq, pr, vi->name);
338
                goto out_put;
339
        }
340
        mtarget = xseg_get_target(peer->xseg, vio->mreq);
341
        if (!mtarget)
342
                goto out_put;
343

    
344
        strncpy(mtarget, target, pr->req->targetlen);
345
        vio->mreq->size = pr->req->size;
346
        vio->mreq->offset = pr->req->offset;
347
        vio->mreq->flags = 0;
348
        switch (pr->req->op) {
349
                case X_READ: vio->mreq->op = X_MAPR; break;
350
                case X_WRITE: vio->mreq->op = X_MAPW; break;
351
                case X_INFO: vio->mreq->op = X_INFO; break;
352
                case X_CLOSE: vio->mreq->op = X_CLOSE; break;
353
                case X_OPEN: vio->mreq->op = X_OPEN; break;
354
                case X_SNAPSHOT:
355
                             //FIXME hack
356
                             vio->mreq->op = X_SNAPSHOT;
357
                             vio->mreq->data = pr->req->data;
358
                             break;
359
                default: goto out_put;
360
        }
361
        xseg_set_req_data(peer->xseg, vio->mreq, pr);
362
        __set_vio_state(vio, MAPPING);
363
        p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
364
        if (p == NoPort)
365
                goto out_unset;
366
        r = xseg_signal(peer->xseg, p);
367
        if (r < 0) {
368
                /* since submission is successful, just print a warning message */
369
                XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
370
        }
371

    
372
        XSEGLOG2(&lc, I, "Pr %lx of volume %s completed", pr, vi->name);
373
        return 0;
374

    
375
out_unset:
376
        xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
377
out_put:
378
        xseg_put_request(peer->xseg, vio->mreq, pr->portno);
379
out_err:
380
        vio->err = 1;
381
        XSEGLOG2(&lc, E, "Pr %lx of volume %s failed", pr, vi->name);
382
        conclude_pr(peer, pr);
383
        return -1;
384
}
385

    
386
static int append_to_pending_reqs(struct volume_info *vi, struct peer_req *pr)
387
{
388
        XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s",
389
                        pr, vi, vi->name);
390
        if (!vi->pending_reqs){
391
                //allocate 8 as default. FIXME make it relevant to nr_ops;
392
                vi->pending_reqs = allocate_queue(8);
393
        }
394

    
395
        if (!vi->pending_reqs){
396
                XSEGLOG2(&lc, E, "Cannot allocate pending reqs queue for volume %s",
397
                                vi->name);
398
                XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
399
                                pr, vi, vi->name);
400
                return -1;
401
        }
402

    
403
        xqindex r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
404
        if (r == Noneidx){
405
                if (doubleup_queue(vi) < 0){
406
                        XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
407
                                        pr, vi, vi->name);
408
                        return -1;
409
                }
410
                r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
411
        }
412

    
413
        if (r == Noneidx){
414
                XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
415
                                pr, vi, vi->name);
416
                return -1;
417
        }
418

    
419
        XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s completed",
420
                        pr, vi, vi->name);
421
        return 0;
422
}
423

    
424
static int handle_accepted(struct peerd *peer, struct peer_req *pr,
425
                                struct xseg_request *req)
426
{
427
        struct vlmc_io *vio = __get_vlmcio(pr);
428
        struct vlmcd *vlmc = __get_vlmcd(peer);
429
        char *target = xseg_get_target(peer->xseg, req);
430
        struct volume_info *vi = find_volume_len(vlmc, target, req->targetlen);
431
        XSEGLOG2(&lc, I, "Handle accepted for pr %lx, req %lx started", pr, req);
432
        if (!vi){
433
                vi = malloc(sizeof(struct volume_info));
434
                if (!vi){
435
                        vio->err = 1;
436
                        conclude_pr(peer, pr);
437
                        return -1;
438
                }
439
                strncpy(vi->name, target, req->targetlen);
440
                vi->name[req->targetlen] = 0;
441
                vi->flags = 0;
442
                vi->pending_pr = NULL;
443
                vi->active_reqs = 0;
444
                vi->pending_reqs = 0;
445
                if (insert_volume(vlmc, vi) < 0){
446
                        vio->err = 1;
447
                        conclude_pr(peer, pr);
448
                        free(vi);
449
                        return -1;
450
                }
451
        }
452

    
453
        if (vi->flags & VF_VOLUME_FREEZED){
454
                XSEGLOG2(&lc, I, "Volume %s (vi %lx) frozen. Appending to pending_reqs",
455
                                vi->name, vi);
456
                if (append_to_pending_reqs(vi, pr) < 0){
457
                        vio->err = 1;
458
                        conclude_pr(peer, pr);
459
                        return -1;
460
                }
461
                return 0;
462
        }
463

    
464
        return do_accepted_pr(peer, pr);
465
}
466

    
467

    
468
static int mapping_info(struct peerd *peer, struct peer_req *pr)
469
{
470
        struct vlmc_io *vio = __get_vlmcio(pr);
471
        struct xseg_request *req = pr->req;
472
        char *target;
473
        char buf[XSEG_MAX_TARGETLEN + 1];
474
        int r;
475

    
476
        if (vio->mreq->state & XS_FAILED){
477
                XSEGLOG2(&lc, E, "Info req %lx failed",
478
                                (unsigned long)vio->mreq);
479
                vio->err = 1;
480
        }
481
        else {
482
                if (req->datalen < sizeof(struct xseg_reply_info)) {
483
                        target = xseg_get_target(peer->xseg, req);
484
                        strncpy(buf, target, req->targetlen);
485
                        r = xseg_resize_request(peer->xseg, req, req->targetlen, sizeof(struct xseg_reply_info));
486
                        if (r < 0) {
487
                                XSEGLOG2(&lc, E, "Cannot resize request");
488
                                vio->err = 1;
489
                                goto out;
490
                        }
491
                        target = xseg_get_target(peer->xseg, req);
492
                        strncpy(target, buf, req->targetlen);
493
                }
494
                struct xseg_reply_info *xinfo = (struct xseg_reply_info *)xseg_get_data(peer->xseg, vio->mreq);
495
                char *data = xseg_get_data(peer->xseg, pr->req);
496
                struct xseg_reply_info *xreply = (struct xseg_reply_info *)data;
497
                xreply->size = xinfo->size;
498
        }
499
out:
500
        xseg_put_request(peer->xseg, vio->mreq, pr->portno);
501
        vio->mreq = NULL;
502
        conclude_pr(peer, pr);
503
        return 0;
504
}
505

    
506
static int mapping_open(struct peerd *peer, struct peer_req *pr)
507
{
508
        struct vlmc_io *vio = __get_vlmcio(pr);
509
        if (vio->mreq->state & XS_FAILED){
510
                XSEGLOG2(&lc, E, "Open req %lx failed",
511
                                (unsigned long)vio->mreq);
512
                vio->err = 1;
513
        }
514
        xseg_put_request(peer->xseg, vio->mreq, pr->portno);
515
        vio->mreq = NULL;
516
        conclude_pr(peer, pr);
517
        return 0;
518
}
519

    
520
static int mapping_close(struct peerd *peer, struct peer_req *pr)
521
{
522
        struct vlmcd *vlmc = __get_vlmcd(peer);
523
        struct vlmc_io *vio = __get_vlmcio(pr);
524
        if (vio->mreq->state & XS_FAILED){
525
                XSEGLOG2(&lc, E, "Close req %lx failed",
526
                                (unsigned long)vio->mreq);
527
                vio->err = 1;
528
        }
529
        char *target = xseg_get_target(peer->xseg, pr->req);
530
        struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
531

    
532
        xseg_put_request(peer->xseg, vio->mreq, pr->portno);
533
        vio->mreq = NULL;
534
        conclude_pr(peer, pr);
535

    
536
        //assert active_reqs == 1
537
        //assert volume freezed
538
        //unfreeze
539
        if (!vi){
540
                XSEGLOG2(&lc, E, "Volume has not volume info");
541
                return 0;
542
        }
543
        vi->flags &= ~ VF_VOLUME_FREEZED;
544
        if (!vi->pending_reqs || !xq_count(vi->pending_reqs)){
545
                XSEGLOG2(&lc, I, "Volume %s (vi %lx) had no pending reqs. Removing",
546
                                vi->name, vi);
547
                if (vi->pending_reqs)
548
                        xq_free(vi->pending_reqs);
549
                remove_volume(vlmc, vi);
550
                free(vi);
551
        }
552
        else {
553
                xqindex xqi;
554
                XSEGLOG2(&lc, I, "Volume %s (vi %lx) had pending reqs. Handling",
555
                                vi->name, vi);
556
                while (!(vi->flags & VF_VOLUME_FREEZED) &&
557
                                (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
558
                        struct peer_req *ppr = (struct peer_req *) xqi;
559
                        do_accepted_pr(peer, ppr);
560
                }
561
                XSEGLOG2(&lc, I, "Volume %s (vi %lx) handling pending reqs completed",
562
                                vi->name, vi);
563
        }
564
        return 0;
565
}
566

    
567
static int mapping_snapshot(struct peerd *peer, struct peer_req *pr)
568
{
569
        struct vlmcd *vlmc = __get_vlmcd(peer);
570
        struct vlmc_io *vio = __get_vlmcio(pr);
571
        char *target = xseg_get_target(peer->xseg, pr->req);
572
        struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
573
        if (vio->mreq->state & XS_FAILED){
574
                XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
575
                                (unsigned long)vio->mreq, vio->mreq->op);
576
                vio->err = 1;
577
        }
578

    
579
        xseg_put_request(peer->xseg, vio->mreq, pr->portno);
580
        vio->mreq = NULL;
581
        conclude_pr(peer, pr);
582

    
583
        //assert volume freezed
584
        //unfreeze
585
        if (!vi){
586
                XSEGLOG2(&lc, E, "Volume has no volume info");
587
                return 0;
588
        }
589
        XSEGLOG2(&lc, D, "Unfreezing volume %s", vi->name);
590
        vi->flags &= ~ VF_VOLUME_FREEZED;
591

    
592
        xqindex xqi;
593
        while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
594
                        (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
595
                struct peer_req *ppr = (struct peer_req *) xqi;
596
                do_accepted_pr(peer, ppr);
597
        }
598
        return 0;
599
}
600

    
601
static int mapping_readwrite(struct peerd *peer, struct peer_req *pr)
602
{
603
        struct vlmcd *vlmc = __get_vlmcd(peer);
604
        struct vlmc_io *vio = __get_vlmcio(pr);
605
        struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
606
        uint64_t pos, datalen, offset;
607
        uint32_t targetlen;
608
        struct xseg_request *breq;
609
        char *target;
610
        int i,r;
611
        xport p;
612
        if (vio->mreq->state & XS_FAILED){
613
                XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
614
                                (unsigned long)vio->mreq, vio->mreq->op);
615
                xseg_put_request(peer->xseg, vio->mreq, pr->portno);
616
                vio->mreq = NULL;
617
                vio->err = 1;
618
                conclude_pr(peer, pr);
619
                return 0;
620
        }
621

    
622
        if (!mreply || !mreply->cnt){
623
                xseg_put_request(peer->xseg, vio->mreq, pr->portno);
624
                vio->mreq = NULL;
625
                vio->err = 1;
626
                conclude_pr(peer, pr);
627
                return -1;
628
        }
629

    
630
        vio->breq_len = mreply->cnt;
631
        vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
632
        if (!vio->breqs) {
633
                xseg_put_request(peer->xseg, vio->mreq, pr->portno);
634
                vio->mreq = NULL;
635
                vio->err = 1;
636
                conclude_pr(peer, pr);
637
                return -1;
638
        }
639

    
640
        pos = 0;
641
        __set_vio_state(vio, SERVING);
642
        for (i = 0; i < vio->breq_len; i++) {
643
                datalen = mreply->segs[i].size;
644
                offset = mreply->segs[i].offset;
645
                targetlen = mreply->segs[i].targetlen;
646
                breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
647
                if (!breq) {
648
                        vio->err = 1;
649
                        break;
650
                }
651
                r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
652
                if (r < 0) {
653
                        vio->err = 1;
654
                        xseg_put_request(peer->xseg, breq, pr->portno);
655
                        break;
656
                }
657
                breq->offset = offset;
658
                breq->size = datalen;
659
                breq->op = pr->req->op;
660
                target = xseg_get_target(peer->xseg, breq);
661
                if (!target) {
662
                        vio->err = 1;
663
                        xseg_put_request(peer->xseg, breq, pr->portno);
664
                        break;
665
                }
666
                strncpy(target, mreply->segs[i].target, targetlen);
667
                r = xseg_set_req_data(peer->xseg, breq, pr);
668
                if (r<0) {
669
                        vio->err = 1;
670
                        xseg_put_request(peer->xseg, breq, pr->portno);
671
                        break;
672
                }
673

    
674
                // this should work, right ?
675
                breq->data = pr->req->data + pos;
676
                pos += datalen;
677
                p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
678
                if (p == NoPort){
679
                        void *dummy;
680
                        vio->err = 1;
681
                        xseg_get_req_data(peer->xseg, breq, &dummy);
682
                        xseg_put_request(peer->xseg, breq, pr->portno);
683
                        break;
684
                }
685
                r = xseg_signal(peer->xseg, p);
686
                if (r < 0){
687
                        XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
688
                }
689
                vio->breqs[i] = breq;
690
        }
691
        vio->breq_cnt = i;
692
        xseg_put_request(peer->xseg, vio->mreq, pr->portno);
693
        vio->mreq = NULL;
694
        if (i == 0) {
695
                free(vio->breqs);
696
                vio->breqs = NULL;
697
                vio->err = 1;
698
                conclude_pr(peer, pr);
699
                return -1;
700
        }
701
        return 0;
702
}
703

    
704
static int handle_mapping(struct peerd *peer, struct peer_req *pr,
705
                                struct xseg_request *req)
706
{
707
        struct vlmc_io *vio = __get_vlmcio(pr);
708

    
709
        //assert vio>mreq == req
710
        if (vio->mreq != req){
711
                XSEGLOG2(&lc, E ,"vio->mreq %lx, req: %lx state: %d breq[0]: %lx",
712
                                (unsigned long)vio->mreq, (unsigned long)req,
713
                                vio->state, (unsigned long)vio->breqs[0]);
714
                return -1;
715
        }
716

    
717
        switch (vio->mreq->op){
718
                case X_INFO:
719
                        mapping_info(peer, pr);
720
                        break;
721
                case X_SNAPSHOT:
722
                        mapping_snapshot(peer, pr);
723
                        break;
724
                case X_CLOSE:
725
                        mapping_close(peer, pr);
726
                        break;
727
                case X_OPEN:
728
                        mapping_open(peer, pr);
729
                        break;
730
                case X_MAPR:
731
                case X_MAPW:
732
                        mapping_readwrite(peer, pr);
733
                        break;
734
                default:
735
                        XSEGLOG2(&lc, W, "Invalid mreq op");
736
                        //vio->err = 1;
737
                        //conclude_pr(peer, pr);
738
                        break;
739
        }
740

    
741
        return 0;
742
}
743

    
744
static int handle_serving(struct peerd *peer, struct peer_req *pr, 
745
                                struct xseg_request *req)
746
{
747
        struct vlmc_io *vio = __get_vlmcio(pr);
748
        struct vlmcd *vlmc = __get_vlmcd(peer);
749
        (void)vlmc;
750
        struct xseg_request *breq = req;
751

    
752
        if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
753
                XSEGLOG2(&lc, E, "req %lx (op: %d) failed at offset %llu\n",
754
                                (unsigned long)req, req->op,
755
                                (unsigned long long)req->offset);
756
                vio->err = 1;
757
        } else {
758
                //assert breq->serviced == breq->size
759
                pr->req->serviced += breq->serviced;
760
        }
761
        xseg_put_request(peer->xseg, breq, pr->portno);
762

    
763
        if (!--vio->breq_cnt){
764
                __set_vio_state(vio, CONCLUDED);
765
                free(vio->breqs);
766
                vio->breqs = NULL;
767
                vio->breq_len = 0;
768
                conclude_pr(peer, pr);
769
        }
770
        return 0;
771
}
772

    
773
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
774
                enum dispatch_reason reason)
775
{
776
        struct vlmc_io *vio = __get_vlmcio(pr);
777
        struct vlmcd *vlmc = __get_vlmcd(peer);
778
        (void)vlmc;
779

    
780
        if (reason == dispatch_accept)
781
                //assert (pr->req == req)
782
                __set_vio_state(vio, ACCEPTED);
783

    
784
        enum io_state_enum state = __get_vio_state(vio);
785
        switch (state) {
786
                case ACCEPTED:
787
                        handle_accepted(peer, pr, req);
788
                        break;
789
                case MAPPING:
790
                        handle_mapping(peer, pr, req);
791
                        break;
792
                case SERVING:
793
                        handle_serving(peer, pr, req);
794
                        break;
795
                case CONCLUDED:
796
                        XSEGLOG2(&lc, W, "invalid state. dispatch called for CONCLUDED");
797
                        break;
798
                default:
799
                        XSEGLOG2(&lc, E, "wtf dude? invalid state");
800
                        break;
801
        }
802
        return 0;
803
}
804

    
805

    
806
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
807
{
808
        struct vlmc_io *vio;
809
        struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
810
        int i, j;
811

    
812
        if (!vlmc) {
813
                XSEGLOG2(&lc, E, "Cannot alloc vlmc");
814
                return -1;
815
        }
816
        peer->priv = (void *) vlmc;
817

    
818
        vlmc->volumes = xhash_new(3, 0, STRING);
819
        if (!vlmc->volumes){
820
                XSEGLOG2(&lc, E, "Cannot alloc vlmc");
821
                return -1;
822
        }
823
        vlmc->mportno = NoPort;
824
        vlmc->bportno = NoPort;
825

    
826
        BEGIN_READ_ARGS(argc, argv);
827
        READ_ARG_ULONG("-mp", vlmc->mportno);
828
        READ_ARG_ULONG("-bp", vlmc->bportno);
829
        END_READ_ARGS();
830

    
831
        if (vlmc->bportno == NoPort) {
832
                XSEGLOG2(&lc, E, "bportno must be provided");
833
                usage(argv[0]);
834
                return -1;
835
        }
836
        if (vlmc->mportno == NoPort) {
837
                XSEGLOG2(&lc, E, "mportno must be provided");
838
                usage(argv[0]);
839
                return -1;
840
        }
841

    
842
        for (i = 0; i < peer->nr_ops; i++) {
843
                vio = malloc(sizeof(struct vlmc_io));
844
                if (!vio) {
845
                        break;
846
                }
847
                vio->mreq = NULL;
848
                vio->breqs = NULL;
849
                vio->breq_cnt = 0;
850
                vio->breq_len = 0;
851
                xlock_release(&vio->lock);
852
                peer->peer_reqs[i].priv = (void *) vio;
853
        }
854
        if (i < peer->nr_ops) {
855
                for (j = 0; j < i; j++) {
856
                        free(peer->peer_reqs[i].priv);
857
                }
858
                return -1;
859
        }
860

    
861

    
862
        const struct sched_param param = { .sched_priority = 99 };
863
        sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
864

    
865
        return 0;
866
}
867

    
868
void custom_peer_finalize(struct peerd *peer)
869
{
870
        return;
871
}