Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / mt-vlmcd.c @ 6217821b

History | View | Annotate | Download (22.7 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#include <stdio.h>
36
#include <stdlib.h>
37
#include <unistd.h>
38
#include <xseg/xseg.h>
39
#include <xseg/protocol.h>
40
#include <peer.h>
41
#include <sched.h>
42
#include <sys/syscall.h>
43

    
44
enum io_state_enum {
45
        ACCEPTED = 0,
46
        MAPPING = 1,
47
        SERVING = 2,
48
        CONCLUDED = 3
49
};
50

    
51
#define VF_VOLUME_FREEZED (1 << 0)
52

    
53
struct volume_info{
54
        char name[XSEG_MAX_TARGETLEN + 1];
55
        uint32_t flags;
56
        uint32_t active_reqs;
57
        struct xq *pending_reqs;
58
        struct peer_req *pending_pr;
59
};
60

    
61
struct vlmcd {
62
        xport mportno;
63
        xport bportno;
64
        xhash_t *volumes; //hash [volumename] -> struct volume_info
65
};
66

    
67
struct vlmc_io {
68
        int err;
69
        struct xlock lock;
70
        volatile enum io_state_enum state;
71
        struct xseg_request *mreq;
72
        struct xseg_request **breqs;
73
        unsigned long breq_len, breq_cnt;
74
};
75

    
76
void custom_peer_usage()
77
{
78
        fprintf(stderr, "Custom peer options: \n"
79
                        "-mp : mapper port\n"
80
                        "-bp : blocker port for blocks\n"
81
                        "\n");
82
}
83

    
84
static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
85
{
86
        vio->state = state;
87
}
88

    
89
static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
90
{
91
        enum io_state_enum state;
92
        state = vio->state;
93
        return state;
94
}
95

    
96
static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
97
{
98
        return (struct vlmc_io *) pr->priv;
99
}
100

    
101
static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
102
{
103
        return (struct vlmcd *) peer->priv;
104
}
105

    
106
static struct xq * allocate_queue(xqindex nr)
107
{
108
        struct xq *q = malloc(sizeof(struct xq));
109
        if (!q)
110
                return NULL;
111
        if (!xq_alloc_empty(q, nr)){
112
                free(q);
113
                return NULL;
114
        }
115
        return q;
116
}
117

    
118
static int doubleup_queue(struct volume_info *vi)
119
{
120
        //assert vi->pending_reqs
121
        XSEGLOG2(&lc, D, "Doubling up queue of volume %s", vi->name);
122
        struct xq *newq = allocate_queue(vi->pending_reqs->size * 2);
123
        if (!newq){
124
                XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Allocation error",
125
                                vi->name);
126
                return -1;
127
        }
128

    
129
        if (__xq_resize(vi->pending_reqs, newq) == Noneidx){
130
                xq_free(newq);
131
                free(newq);
132
                XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Resize error",
133
                                vi->name);
134
                return -1;
135
        }
136
        xq_free(vi->pending_reqs);
137
        free(vi->pending_reqs);
138
        vi->pending_reqs = newq;
139
        XSEGLOG2(&lc, D, "Doubling up queue of volume %s completed", vi->name);
140
        return 0;
141
}
142

    
143
static struct volume_info * find_volume(struct vlmcd *vlmc, char *volume)
144
{
145
        struct volume_info *vi = NULL;
146
        XSEGLOG2(&lc, D, "looking up volume %s", volume);
147
        int r = xhash_lookup(vlmc->volumes, (xhashidx) volume,
148
                        (xhashidx *) &vi);
149
        if (r < 0){
150
                XSEGLOG2(&lc, D, "looking up volume %s failed", volume);
151
                return NULL;
152
        }
153
        XSEGLOG2(&lc, D, "looking up volume %s completed. VI: %lx",
154
                        volume, (unsigned long)vi);
155
        return vi;
156
}
157

    
158
static struct volume_info * find_volume_len(struct vlmcd *vlmc, char *target,
159
                                                uint32_t targetlen)
160
{
161
        char buf[XSEG_MAX_TARGETLEN+1];
162
        strncpy(buf, target, targetlen);
163
        buf[targetlen] = 0;
164
        XSEGLOG2(&lc, D, "looking up volume %s, len %u",
165
                        buf, targetlen);
166
        return find_volume(vlmc, buf);
167

    
168
}
169

    
170
static int insert_volume(struct vlmcd *vlmc, struct volume_info *vi)
171
{
172
        int r = -1;
173

    
174
        if (find_volume(vlmc, vi->name)){
175
                XSEGLOG2(&lc, W, "Volume %s found in hash", vi->name);
176
                return r;
177
        }
178

    
179
        XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx)", 
180
                        vi->name, strlen(vi->name), (unsigned long) vi);
181
        r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
182
        while (r == -XHASH_ERESIZE) {
183
                xhashidx shift = xhash_grow_size_shift(vlmc->volumes);
184
                xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
185
                if (!new_hashmap){
186
                        XSEGLOG2(&lc, E, "Cannot grow vlmc->volumes to sizeshift %llu",
187
                                        (unsigned long long) shift);
188
                        return r;
189
                }
190
                vlmc->volumes = new_hashmap;
191
                r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
192
        }
193
        XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx) completed", 
194
                        vi->name, strlen(vi->name), (unsigned long) vi);
195

    
196
        return r;
197

    
198
}
199

    
200
static int remove_volume(struct vlmcd *vlmc, struct volume_info *vi)
201
{
202
        int r = -1;
203

    
204
        XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx)", 
205
                        vi->name, strlen(vi->name), (unsigned long) vi);
206
        r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
207
        while (r == -XHASH_ERESIZE) {
208
                xhashidx shift = xhash_shrink_size_shift(vlmc->volumes);
209
                xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
210
                if (!new_hashmap){
211
                        XSEGLOG2(&lc, E, "Cannot shrink vlmc->volumes to sizeshift %llu",
212
                                        (unsigned long long) shift);
213
                        XSEGLOG2(&lc, E, "Removing volume %s, (volume_info: %lx) failed", 
214
                                        vi->name, (unsigned long) vi);
215
                        return r;
216
                }
217
                vlmc->volumes = new_hashmap;
218
                r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
219
        }
220
        if (r < 0)
221
                XSEGLOG2(&lc, W, "Removing volume %s, len: %d (volume_info: %lx) failed", 
222
                        vi->name, strlen(vi->name), (unsigned long) vi);
223
        else
224
                XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx) completed", 
225
                        vi->name, strlen(vi->name), (unsigned long) vi);
226
        return r;
227
}
228

    
229
static int do_accepted_pr(struct peerd *peer, struct peer_req *pr);
230

    
231
static int conclude_pr(struct peerd *peer, struct peer_req *pr)
232
{
233
        struct vlmcd *vlmc = __get_vlmcd(peer);
234
        struct vlmc_io *vio = __get_vlmcio(pr);
235
        char *target = xseg_get_target(peer->xseg, pr->req);
236
        struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
237

    
238
        XSEGLOG2(&lc, D, "Concluding pr %lx, req: %lx vi: %lx", pr, pr->req, vi);
239

    
240
        __set_vio_state(vio, CONCLUDED);
241
        if (vio->err)
242
                fail(peer, pr);
243
        else
244
                complete(peer, pr);
245

    
246
        if (vi){
247
                //assert vi->active_reqs > 0
248
                uint32_t ar = --vi->active_reqs;
249
                XSEGLOG2(&lc, D, "vi: %lx, volume name: %s, active_reqs: %lu, pending_pr: %lx",
250
                                vi, vi->name, ar, vi->pending_pr);
251
                if (!ar && vi->pending_pr)
252
                        do_accepted_pr(peer, vi->pending_pr);
253
        }
254
        XSEGLOG2(&lc, D, "Concluded pr %lx, vi: %lx", pr, vi);
255
        return 0;
256
}
257

    
258
static int should_freeze_volume(struct xseg_request *req)
259
{
260
        if (req->op == X_CLOSE || req->op == X_SNAPSHOT ||
261
                (req->op == X_WRITE && !req->size && (req->flags & XF_FLUSH)))
262
                return 1;
263
        return 0;
264
}
265

    
266
static int do_accepted_pr(struct peerd *peer, struct peer_req *pr)
267
{
268
        struct vlmcd *vlmc = __get_vlmcd(peer);
269
        struct vlmc_io *vio = __get_vlmcio(pr);
270
        int r;
271
        xport p;
272
        char *target, *mtarget;
273
        void *dummy;
274

    
275
        struct volume_info *vi;
276

    
277
        XSEGLOG2(&lc, I, "Do accepted pr started for pr %lx", pr);
278
        target = xseg_get_target(peer->xseg, pr->req);
279
        if (!target){
280
                vio->err = 1;
281
                conclude_pr(peer, pr);
282
                return -1;
283
        }
284

    
285
        vi = find_volume_len(vlmc, target, pr->req->targetlen);
286
        if (!vi){
287
                XSEGLOG2(&lc, E, "Cannot find volume");
288
                XSEGLOG2(&lc, E, "Pr %lx", pr);
289
                vio->err = 1;
290
                conclude_pr(peer, pr);
291
                return -1;
292
        }
293

    
294
        if (should_freeze_volume(pr->req)){
295
                XSEGLOG2(&lc, I, "Freezing volume %s", vi->name);
296
                vi->flags |= VF_VOLUME_FREEZED;
297
                if (vi->active_reqs){
298
                        //assert vi->pending_pr == NULL;
299
                        XSEGLOG2(&lc, I, "Active reqs of %s: %lu. Pending pr is set to %lx",
300
                                        vi->name, vi->active_reqs, pr);
301
                        vi->pending_pr = pr;
302
                        return 0;
303
                }
304
                else {
305
                        XSEGLOG2(&lc, I, "No active reqs of %s. Pending pr is set to NULL",
306
                                        vi->name);
307
                        //assert vi->pending_pr == pr
308
                        vi->pending_pr = NULL;
309
                }
310

    
311
        }
312

    
313
        vi->active_reqs++;
314

    
315
        vio->err = 0; //reset error state
316

    
317
        if (pr->req->op == X_WRITE && !pr->req->size &&
318
                        (pr->req->flags & (XF_FLUSH|XF_FUA))){
319
                //hanlde flush requests here, so we don't mess with mapper
320
                //because of the -1 offset
321
                vi->flags &= ~VF_VOLUME_FREEZED;
322
                XSEGLOG2(&lc, I, "Completing flush request");
323
                pr->req->serviced = pr->req->size;
324
                conclude_pr(peer, pr);
325
                xqindex xqi;
326
                while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
327
                                (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
328
                        struct peer_req *ppr = (struct peer_req *) xqi;
329
                        do_accepted_pr(peer, ppr);
330
                }
331
                return 0;
332
        }
333

    
334
        vio->mreq = xseg_get_request(peer->xseg, pr->portno,
335
                                        vlmc->mportno, X_ALLOC);
336
        if (!vio->mreq)
337
                goto out_err;
338

    
339
        /* use datalen 0. let mapper allocate buffer space as needed */
340
        r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0);
341
        if (r < 0) {
342
                XSEGLOG2(&lc, E, "Cannot prep request %lx, of pr %lx for volume %s",
343
                                vio->mreq, pr, vi->name);
344
                goto out_put;
345
        }
346
        mtarget = xseg_get_target(peer->xseg, vio->mreq);
347
        if (!mtarget)
348
                goto out_put;
349

    
350
        strncpy(mtarget, target, pr->req->targetlen);
351
        vio->mreq->size = pr->req->size;
352
        vio->mreq->offset = pr->req->offset;
353
        vio->mreq->flags = 0;
354
        switch (pr->req->op) {
355
                case X_READ: vio->mreq->op = X_MAPR; break;
356
                case X_WRITE: vio->mreq->op = X_MAPW; break;
357
                case X_INFO: vio->mreq->op = X_INFO; break;
358
                case X_CLOSE: vio->mreq->op = X_CLOSE; break;
359
                case X_OPEN: vio->mreq->op = X_OPEN; break;
360
                case X_SNAPSHOT: vio->mreq->op = X_SNAPSHOT; break;
361
                default: goto out_put;
362
        }
363
        xseg_set_req_data(peer->xseg, vio->mreq, pr);
364
        __set_vio_state(vio, MAPPING);
365
        p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
366
        if (p == NoPort)
367
                goto out_unset;
368
        r = xseg_signal(peer->xseg, p);
369
        if (r < 0) {
370
                /* since submission is successful, just print a warning message */
371
                XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
372
        }
373

    
374
        XSEGLOG2(&lc, I, "Pr %lx of volume %s completed", pr, vi->name);
375
        return 0;
376

    
377
out_unset:
378
        xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
379
out_put:
380
        xseg_put_request(peer->xseg, vio->mreq, pr->portno);
381
out_err:
382
        vio->err = 1;
383
        XSEGLOG2(&lc, E, "Pr %lx of volume %s failed", pr, vi->name);
384
        conclude_pr(peer, pr);
385
        return -1;
386
}
387

    
388
static int append_to_pending_reqs(struct volume_info *vi, struct peer_req *pr)
389
{
390
        XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s",
391
                        pr, vi, vi->name);
392
        if (!vi->pending_reqs){
393
                //allocate 8 as default. FIXME make it relevant to nr_ops;
394
                vi->pending_reqs = allocate_queue(8);
395
        }
396

    
397
        if (!vi->pending_reqs){
398
                XSEGLOG2(&lc, E, "Cannot allocate pending reqs queue for volume %s",
399
                                vi->name);
400
                XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
401
                                pr, vi, vi->name);
402
                return -1;
403
        }
404

    
405
        xqindex r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
406
        if (r == Noneidx){
407
                if (doubleup_queue(vi) < 0){
408
                        XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
409
                                        pr, vi, vi->name);
410
                        return -1;
411
                }
412
                r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
413
        }
414

    
415
        if (r == Noneidx){
416
                XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
417
                                pr, vi, vi->name);
418
                return -1;
419
        }
420

    
421
        XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s completed",
422
                        pr, vi, vi->name);
423
        return 0;
424
}
425

    
426
static int handle_accepted(struct peerd *peer, struct peer_req *pr,
427
                                struct xseg_request *req)
428
{
429
        struct vlmc_io *vio = __get_vlmcio(pr);
430
        struct vlmcd *vlmc = __get_vlmcd(peer);
431
        char *target = xseg_get_target(peer->xseg, req);
432
        struct volume_info *vi = find_volume_len(vlmc, target, req->targetlen);
433
        XSEGLOG2(&lc, I, "Handle accepted for pr %lx, req %lx started", pr, req);
434
        if (!vi){
435
                vi = malloc(sizeof(struct volume_info));
436
                if (!vi){
437
                        vio->err = 1;
438
                        conclude_pr(peer, pr);
439
                        return -1;
440
                }
441
                strncpy(vi->name, target, req->targetlen);
442
                vi->name[req->targetlen] = 0;
443
                vi->flags = 0;
444
                vi->pending_pr = NULL;
445
                vi->active_reqs = 0;
446
                vi->pending_reqs = 0;
447
                if (insert_volume(vlmc, vi) < 0){
448
                        vio->err = 1;
449
                        conclude_pr(peer, pr);
450
                        free(vi);
451
                        return -1;
452
                }
453
        }
454

    
455
        if (vi->flags & VF_VOLUME_FREEZED){
456
                XSEGLOG2(&lc, I, "Volume %s (vi %lx) frozen. Appending to pending_reqs",
457
                                vi->name, vi);
458
                if (append_to_pending_reqs(vi, pr) < 0){
459
                        vio->err = 1;
460
                        conclude_pr(peer, pr);
461
                        return -1;
462
                }
463
                return 0;
464
        }
465

    
466
        return do_accepted_pr(peer, pr);
467
}
468

    
469

    
470
static int mapping_info(struct peerd *peer, struct peer_req *pr)
471
{
472
        struct vlmc_io *vio = __get_vlmcio(pr);
473
        if (vio->mreq->state & XS_FAILED){
474
                XSEGLOG2(&lc, E, "Info req %lx failed",
475
                                (unsigned long)vio->mreq);
476
                vio->err = 1;
477
        }
478
        else {
479
                struct xseg_reply_info *xinfo = (struct xseg_reply_info *)xseg_get_data(peer->xseg, vio->mreq);
480
                char *data = xseg_get_data(peer->xseg, pr->req);
481
                struct xseg_reply_info *xreply = (struct xseg_reply_info *)data;
482
                xreply->size = xinfo->size;
483
        }
484
        xseg_put_request(peer->xseg, vio->mreq, pr->portno);
485
        vio->mreq = NULL;
486
        conclude_pr(peer, pr);
487
        return 0;
488
}
489

    
490
static int mapping_open(struct peerd *peer, struct peer_req *pr)
491
{
492
        struct vlmc_io *vio = __get_vlmcio(pr);
493
        if (vio->mreq->state & XS_FAILED){
494
                XSEGLOG2(&lc, E, "Open req %lx failed",
495
                                (unsigned long)vio->mreq);
496
                vio->err = 1;
497
        }
498
        xseg_put_request(peer->xseg, vio->mreq, pr->portno);
499
        vio->mreq = NULL;
500
        conclude_pr(peer, pr);
501
        return 0;
502
}
503

    
504
static int mapping_close(struct peerd *peer, struct peer_req *pr)
505
{
506
        struct vlmcd *vlmc = __get_vlmcd(peer);
507
        struct vlmc_io *vio = __get_vlmcio(pr);
508
        if (vio->mreq->state & XS_FAILED){
509
                XSEGLOG2(&lc, E, "Close req %lx failed",
510
                                (unsigned long)vio->mreq);
511
                vio->err = 1;
512
        }
513
        char *target = xseg_get_target(peer->xseg, pr->req);
514
        struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
515

    
516
        xseg_put_request(peer->xseg, vio->mreq, pr->portno);
517
        vio->mreq = NULL;
518
        conclude_pr(peer, pr);
519

    
520
        //assert active_reqs == 1
521
        //assert volume freezed
522
        //unfreeze
523
        if (!vi){
524
                XSEGLOG2(&lc, E, "Volume has not volume info");
525
                return 0;
526
        }
527
        vi->flags &= ~ VF_VOLUME_FREEZED;
528
        if (!vi->pending_reqs || !xq_count(vi->pending_reqs)){
529
                XSEGLOG2(&lc, I, "Volume %s (vi %lx) had no pending reqs. Removing",
530
                                vi->name, vi);
531
                if (vi->pending_reqs)
532
                        xq_free(vi->pending_reqs);
533
                remove_volume(vlmc, vi);
534
                free(vi);
535
        }
536
        else {
537
                xqindex xqi;
538
                XSEGLOG2(&lc, I, "Volume %s (vi %lx) had pending reqs. Handling",
539
                                vi->name, vi);
540
                while (!(vi->flags & VF_VOLUME_FREEZED) &&
541
                                (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
542
                        struct peer_req *ppr = (struct peer_req *) xqi;
543
                        do_accepted_pr(peer, ppr);
544
                }
545
                XSEGLOG2(&lc, I, "Volume %s (vi %lx) handling pending reqs completed",
546
                                vi->name, vi);
547
        }
548
        return 0;
549
}
550

    
551
static int mapping_snapshot(struct peerd *peer, struct peer_req *pr)
552
{
553
        struct vlmcd *vlmc = __get_vlmcd(peer);
554
        struct vlmc_io *vio = __get_vlmcio(pr);
555
        char *target = xseg_get_target(peer->xseg, pr->req);
556
        struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
557
        if (vio->mreq->state & XS_FAILED){
558
                XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
559
                                (unsigned long)vio->mreq, vio->mreq->op);
560
                vio->err = 1;
561
        }
562
        else {
563
                struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, vio->mreq);
564
                char buf[XSEG_MAX_TARGETLEN];
565
                strncpy(buf, target, pr->req->targetlen);
566
                int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, sizeof(struct xseg_reply_snapshot));
567
                if (r < 0) {
568
                        XSEGLOG2(&lc, E, "Cannot resize request");
569
                        vio->err = 1;
570
                }
571
                else {
572
                        target = xseg_get_target(peer->xseg, pr->req);
573
                        strncpy(target, buf, pr->req->targetlen);
574
                        char *data = xseg_get_data(peer->xseg, pr->req);
575
                        struct xseg_reply_snapshot *xsnapshot = (struct xseg_reply_snapshot *) data;
576
                        *xsnapshot = *xreply;
577
                }
578
        }
579

    
580
        xseg_put_request(peer->xseg, vio->mreq, pr->portno);
581
        vio->mreq = NULL;
582
        conclude_pr(peer, pr);
583

    
584
        //assert volume freezed
585
        //unfreeze
586
        if (!vi){
587
                XSEGLOG2(&lc, E, "Volume has no volume info");
588
                return 0;
589
        }
590
        XSEGLOG2(&lc, D, "Unfreezing volume %s", vi->name);
591
        vi->flags &= ~ VF_VOLUME_FREEZED;
592

    
593
        xqindex xqi;
594
        while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
595
                        (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
596
                struct peer_req *ppr = (struct peer_req *) xqi;
597
                do_accepted_pr(peer, ppr);
598
        }
599
        return 0;
600
}
601

    
602
static int mapping_readwrite(struct peerd *peer, struct peer_req *pr)
603
{
604
        struct vlmcd *vlmc = __get_vlmcd(peer);
605
        struct vlmc_io *vio = __get_vlmcio(pr);
606
        struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
607
        uint64_t pos, datalen, offset;
608
        uint32_t targetlen;
609
        struct xseg_request *breq;
610
        char *target;
611
        int i,r;
612
        xport p;
613
        if (vio->mreq->state & XS_FAILED){
614
                XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
615
                                (unsigned long)vio->mreq, vio->mreq->op);
616
                xseg_put_request(peer->xseg, vio->mreq, pr->portno);
617
                vio->mreq = NULL;
618
                vio->err = 1;
619
                conclude_pr(peer, pr);
620
                return 0;
621
        }
622

    
623
        if (!mreply || !mreply->cnt){
624
                xseg_put_request(peer->xseg, vio->mreq, pr->portno);
625
                vio->mreq = NULL;
626
                vio->err = 1;
627
                conclude_pr(peer, pr);
628
                return -1;
629
        }
630

    
631
        vio->breq_len = mreply->cnt;
632
        vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
633
        if (!vio->breqs) {
634
                xseg_put_request(peer->xseg, vio->mreq, pr->portno);
635
                vio->mreq = NULL;
636
                vio->err = 1;
637
                conclude_pr(peer, pr);
638
                return -1;
639
        }
640

    
641
        pos = 0;
642
        __set_vio_state(vio, SERVING);
643
        for (i = 0; i < vio->breq_len; i++) {
644
                datalen = mreply->segs[i].size;
645
                offset = mreply->segs[i].offset;
646
                targetlen = mreply->segs[i].targetlen;
647
                breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
648
                if (!breq) {
649
                        vio->err = 1;
650
                        break;
651
                }
652
                r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
653
                if (r < 0) {
654
                        vio->err = 1;
655
                        xseg_put_request(peer->xseg, breq, pr->portno);
656
                        break;
657
                }
658
                breq->offset = offset;
659
                breq->size = datalen;
660
                breq->op = pr->req->op;
661
                target = xseg_get_target(peer->xseg, breq);
662
                if (!target) {
663
                        vio->err = 1;
664
                        xseg_put_request(peer->xseg, breq, pr->portno);
665
                        break;
666
                }
667
                strncpy(target, mreply->segs[i].target, targetlen);
668
                r = xseg_set_req_data(peer->xseg, breq, pr);
669
                if (r<0) {
670
                        vio->err = 1;
671
                        xseg_put_request(peer->xseg, breq, pr->portno);
672
                        break;
673
                }
674

    
675
                // this should work, right ?
676
                breq->data = pr->req->data + pos;
677
                pos += datalen;
678
                p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
679
                if (p == NoPort){
680
                        void *dummy;
681
                        vio->err = 1;
682
                        xseg_get_req_data(peer->xseg, breq, &dummy);
683
                        xseg_put_request(peer->xseg, breq, pr->portno);
684
                        break;
685
                }
686
                r = xseg_signal(peer->xseg, p);
687
                if (r < 0){
688
                        XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
689
                }
690
                vio->breqs[i] = breq;
691
        }
692
        vio->breq_cnt = i;
693
        xseg_put_request(peer->xseg, vio->mreq, pr->portno);
694
        vio->mreq = NULL;
695
        if (i == 0) {
696
                free(vio->breqs);
697
                vio->breqs = NULL;
698
                vio->err = 1;
699
                conclude_pr(peer, pr);
700
                return -1;
701
        }
702
        return 0;
703
}
704

    
705
static int handle_mapping(struct peerd *peer, struct peer_req *pr,
706
                                struct xseg_request *req)
707
{
708
        struct vlmc_io *vio = __get_vlmcio(pr);
709

    
710
        //assert vio>mreq == req
711
        if (vio->mreq != req){
712
                XSEGLOG2(&lc, E ,"vio->mreq %lx, req: %lx state: %d breq[0]: %lx",
713
                                (unsigned long)vio->mreq, (unsigned long)req,
714
                                vio->state, (unsigned long)vio->breqs[0]);
715
                return -1;
716
        }
717

    
718
        switch (vio->mreq->op){
719
                case X_INFO:
720
                        mapping_info(peer, pr);
721
                        break;
722
                case X_SNAPSHOT:
723
                        mapping_snapshot(peer, pr);
724
                        break;
725
                case X_CLOSE:
726
                        mapping_close(peer, pr);
727
                        break;
728
                case X_OPEN:
729
                        mapping_open(peer, pr);
730
                        break;
731
                case X_MAPR:
732
                case X_MAPW:
733
                        mapping_readwrite(peer, pr);
734
                        break;
735
                default:
736
                        XSEGLOG2(&lc, W, "Invalid mreq op");
737
                        //vio->err = 1;
738
                        //conclude_pr(peer, pr);
739
                        break;
740
        }
741

    
742
        return 0;
743
}
744

    
745
static int handle_serving(struct peerd *peer, struct peer_req *pr, 
746
                                struct xseg_request *req)
747
{
748
        struct vlmc_io *vio = __get_vlmcio(pr);
749
        struct vlmcd *vlmc = __get_vlmcd(peer);
750
        (void)vlmc;
751
        struct xseg_request *breq = req;
752

    
753
        if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
754
                XSEGLOG2(&lc, E, "req %lx (op: %d) failed at offset %llu\n",
755
                                (unsigned long)req, req->op,
756
                                (unsigned long long)req->offset);
757
                vio->err = 1;
758
        } else {
759
                //assert breq->serviced == breq->size
760
                pr->req->serviced += breq->serviced;
761
        }
762
        xseg_put_request(peer->xseg, breq, pr->portno);
763

    
764
        if (!--vio->breq_cnt){
765
                __set_vio_state(vio, CONCLUDED);
766
                free(vio->breqs);
767
                vio->breqs = NULL;
768
                vio->breq_len = 0;
769
                conclude_pr(peer, pr);
770
        }
771
        return 0;
772
}
773

    
774
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
775
                enum dispatch_reason reason)
776
{
777
        struct vlmc_io *vio = __get_vlmcio(pr);
778
        struct vlmcd *vlmc = __get_vlmcd(peer);
779
        (void)vlmc;
780

    
781
        if (reason == dispatch_accept)
782
                //assert (pr->req == req)
783
                __set_vio_state(vio, ACCEPTED);
784

    
785
        enum io_state_enum state = __get_vio_state(vio);
786
        switch (state) {
787
                case ACCEPTED:
788
                        handle_accepted(peer, pr, req);
789
                        break;
790
                case MAPPING:
791
                        handle_mapping(peer, pr, req);
792
                        break;
793
                case SERVING:
794
                        handle_serving(peer, pr, req);
795
                        break;
796
                case CONCLUDED:
797
                        XSEGLOG2(&lc, W, "invalid state. dispatch called for CONCLUDED");
798
                        break;
799
                default:
800
                        XSEGLOG2(&lc, E, "wtf dude? invalid state");
801
                        break;
802
        }
803
        return 0;
804
}
805

    
806

    
807
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
808
{
809
        struct vlmc_io *vio;
810
        struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
811
        int i, j;
812

    
813
        if (!vlmc) {
814
                XSEGLOG2(&lc, E, "Cannot alloc vlmc");
815
                return -1;
816
        }
817
        peer->priv = (void *) vlmc;
818

    
819
        vlmc->volumes = xhash_new(3, STRING);
820
        if (!vlmc->volumes){
821
                XSEGLOG2(&lc, E, "Cannot alloc vlmc");
822
                return -1;
823
        }
824
        vlmc->mportno = NoPort;
825
        vlmc->bportno = NoPort;
826

    
827
        BEGIN_READ_ARGS(argc, argv);
828
        READ_ARG_ULONG("-mp", vlmc->mportno);
829
        READ_ARG_ULONG("-bp", vlmc->bportno);
830
        END_READ_ARGS();
831

    
832
        if (vlmc->bportno == NoPort) {
833
                XSEGLOG2(&lc, E, "bportno must be provided");
834
                usage(argv[0]);
835
                return -1;
836
        }
837
        if (vlmc->mportno == NoPort) {
838
                XSEGLOG2(&lc, E, "mportno must be provided");
839
                usage(argv[0]);
840
                return -1;
841
        }
842

    
843
        for (i = 0; i < peer->nr_ops; i++) {
844
                vio = malloc(sizeof(struct vlmc_io));
845
                if (!vio) {
846
                        break;
847
                }
848
                vio->mreq = NULL;
849
                vio->breqs = NULL;
850
                vio->breq_cnt = 0;
851
                vio->breq_len = 0;
852
                xlock_release(&vio->lock);
853
                peer->peer_reqs[i].priv = (void *) vio;
854
        }
855
        if (i < peer->nr_ops) {
856
                for (j = 0; j < i; j++) {
857
                        free(peer->peer_reqs[i].priv);
858
                }
859
                return -1;
860
        }
861

    
862

    
863
        const struct sched_param param = { .sched_priority = 99 };
864
        sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
865

    
866
        return 0;
867
}
868

    
869
void custom_peer_finalize(struct peerd *peer)
870
{
871
        return;
872
}