Merge branch 'hotfix-0.3.5'
[archipelago] / xseg / peers / user / mt-vlmcd.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <xseg/xseg.h>
39 #include <xseg/protocol.h>
40 #include <peer.h>
41 #include <sched.h>
42 #include <sys/syscall.h>
43
44 enum io_state_enum {
45         ACCEPTED = 0,
46         MAPPING = 1,
47         SERVING = 2,
48         CONCLUDED = 3
49 };
50
51 #define VF_VOLUME_FREEZED (1 << 0)
52
53 struct volume_info{
54         char name[XSEG_MAX_TARGETLEN + 1];
55         uint32_t flags;
56         uint32_t active_reqs;
57         struct xq *pending_reqs;
58         struct peer_req *pending_pr;
59 };
60
61 struct vlmcd {
62         xport mportno;
63         xport bportno;
64         xhash_t *volumes; //hash [volumename] -> struct volume_info
65 };
66
67 struct vlmc_io {
68         int err;
69         struct xlock lock;
70         volatile enum io_state_enum state;
71         struct xseg_request *mreq;
72         struct xseg_request **breqs;
73         unsigned long breq_len, breq_cnt;
74 };
75
76 void custom_peer_usage()
77 {
78         fprintf(stderr, "Custom peer options: \n"
79                         "-mp : mapper port\n"
80                         "-bp : blocker port for blocks\n"
81                         "\n");
82 }
83
84 static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
85 {
86         vio->state = state;
87 }
88
89 static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
90 {
91         enum io_state_enum state;
92         state = vio->state;
93         return state;
94 }
95
96 static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
97 {
98         return (struct vlmc_io *) pr->priv;
99 }
100
101 static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
102 {
103         return (struct vlmcd *) peer->priv;
104 }
105
106 static struct xq * allocate_queue(xqindex nr)
107 {
108         struct xq *q = malloc(sizeof(struct xq));
109         if (!q)
110                 return NULL;
111         if (!xq_alloc_empty(q, nr)){
112                 free(q);
113                 return NULL;
114         }
115         return q;
116 }
117
118 static int doubleup_queue(struct volume_info *vi)
119 {
120         //assert vi->pending_reqs
121         XSEGLOG2(&lc, D, "Doubling up queue of volume %s", vi->name);
122         struct xq *newq = allocate_queue(vi->pending_reqs->size * 2);
123         if (!newq){
124                 XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Allocation error",
125                                 vi->name);
126                 return -1;
127         }
128
129         if (__xq_resize(vi->pending_reqs, newq) == Noneidx){
130                 xq_free(newq);
131                 free(newq);
132                 XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Resize error",
133                                 vi->name);
134                 return -1;
135         }
136         xq_free(vi->pending_reqs);
137         free(vi->pending_reqs);
138         vi->pending_reqs = newq;
139         XSEGLOG2(&lc, D, "Doubling up queue of volume %s completed", vi->name);
140         return 0;
141 }
142
143 static struct volume_info * find_volume(struct vlmcd *vlmc, char *volume)
144 {
145         struct volume_info *vi = NULL;
146         XSEGLOG2(&lc, D, "looking up volume %s", volume);
147         int r = xhash_lookup(vlmc->volumes, (xhashidx) volume,
148                         (xhashidx *) &vi);
149         if (r < 0){
150                 XSEGLOG2(&lc, D, "looking up volume %s failed", volume);
151                 return NULL;
152         }
153         XSEGLOG2(&lc, D, "looking up volume %s completed. VI: %lx",
154                         volume, (unsigned long)vi);
155         return vi;
156 }
157
158 static struct volume_info * find_volume_len(struct vlmcd *vlmc, char *target,
159                                                 uint32_t targetlen)
160 {
161         char buf[XSEG_MAX_TARGETLEN+1];
162         strncpy(buf, target, targetlen);
163         buf[targetlen] = 0;
164         XSEGLOG2(&lc, D, "looking up volume %s, len %u",
165                         buf, targetlen);
166         return find_volume(vlmc, buf);
167
168 }
169
170 static int insert_volume(struct vlmcd *vlmc, struct volume_info *vi)
171 {
172         int r = -1;
173
174         if (find_volume(vlmc, vi->name)){
175                 XSEGLOG2(&lc, W, "Volume %s found in hash", vi->name);
176                 return r;
177         }
178
179         XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx)", 
180                         vi->name, strlen(vi->name), (unsigned long) vi);
181         r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
182         while (r == -XHASH_ERESIZE) {
183                 xhashidx shift = xhash_grow_size_shift(vlmc->volumes);
184                 xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
185                 if (!new_hashmap){
186                         XSEGLOG2(&lc, E, "Cannot grow vlmc->volumes to sizeshift %llu",
187                                         (unsigned long long) shift);
188                         return r;
189                 }
190                 vlmc->volumes = new_hashmap;
191                 r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
192         }
193         XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx) completed", 
194                         vi->name, strlen(vi->name), (unsigned long) vi);
195
196         return r;
197
198 }
199
200 static int remove_volume(struct vlmcd *vlmc, struct volume_info *vi)
201 {
202         int r = -1;
203
204         XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx)", 
205                         vi->name, strlen(vi->name), (unsigned long) vi);
206         r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
207         while (r == -XHASH_ERESIZE) {
208                 xhashidx shift = xhash_shrink_size_shift(vlmc->volumes);
209                 xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
210                 if (!new_hashmap){
211                         XSEGLOG2(&lc, E, "Cannot shrink vlmc->volumes to sizeshift %llu",
212                                         (unsigned long long) shift);
213                         XSEGLOG2(&lc, E, "Removing volume %s, (volume_info: %lx) failed", 
214                                         vi->name, (unsigned long) vi);
215                         return r;
216                 }
217                 vlmc->volumes = new_hashmap;
218                 r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
219         }
220         if (r < 0)
221                 XSEGLOG2(&lc, W, "Removing volume %s, len: %d (volume_info: %lx) failed", 
222                         vi->name, strlen(vi->name), (unsigned long) vi);
223         else
224                 XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx) completed", 
225                         vi->name, strlen(vi->name), (unsigned long) vi);
226         return r;
227 }
228
229 static int do_accepted_pr(struct peerd *peer, struct peer_req *pr);
230
231 static int conclude_pr(struct peerd *peer, struct peer_req *pr)
232 {
233         struct vlmcd *vlmc = __get_vlmcd(peer);
234         struct vlmc_io *vio = __get_vlmcio(pr);
235         char *target = xseg_get_target(peer->xseg, pr->req);
236         struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
237
238         XSEGLOG2(&lc, D, "Concluding pr %lx, req: %lx vi: %lx", pr, pr->req, vi);
239
240         __set_vio_state(vio, CONCLUDED);
241         if (vio->err)
242                 fail(peer, pr);
243         else
244                 complete(peer, pr);
245
246         if (vi){
247                 //assert vi->active_reqs > 0
248                 uint32_t ar = --vi->active_reqs;
249                 XSEGLOG2(&lc, D, "vi: %lx, volume name: %s, active_reqs: %lu, pending_pr: %lx",
250                                 vi, vi->name, ar, vi->pending_pr);
251                 if (!ar && vi->pending_pr)
252                         do_accepted_pr(peer, vi->pending_pr);
253         }
254         XSEGLOG2(&lc, D, "Concluded pr %lx, vi: %lx", pr, vi);
255         return 0;
256 }
257
258 static int should_freeze_volume(struct xseg_request *req)
259 {
260         if (req->op == X_CLOSE || req->op == X_SNAPSHOT ||
261                 (req->op == X_WRITE && !req->size && (req->flags & XF_FLUSH)))
262                 return 1;
263         return 0;
264 }
265
266 static int do_accepted_pr(struct peerd *peer, struct peer_req *pr)
267 {
268         struct vlmcd *vlmc = __get_vlmcd(peer);
269         struct vlmc_io *vio = __get_vlmcio(pr);
270         int r;
271         xport p;
272         char *target, *mtarget;
273         void *dummy;
274
275         struct volume_info *vi;
276
277         XSEGLOG2(&lc, I, "Do accepted pr started for pr %lx", pr);
278         target = xseg_get_target(peer->xseg, pr->req);
279         if (!target){
280                 vio->err = 1;
281                 conclude_pr(peer, pr);
282                 return -1;
283         }
284
285         vi = find_volume_len(vlmc, target, pr->req->targetlen);
286         if (!vi){
287                 XSEGLOG2(&lc, E, "Cannot find volume");
288                 XSEGLOG2(&lc, E, "Pr %lx", pr);
289                 vio->err = 1;
290                 conclude_pr(peer, pr);
291                 return -1;
292         }
293
294         if (should_freeze_volume(pr->req)){
295                 XSEGLOG2(&lc, I, "Freezing volume %s", vi->name);
296                 vi->flags |= VF_VOLUME_FREEZED;
297                 if (vi->active_reqs){
298                         //assert vi->pending_pr == NULL;
299                         XSEGLOG2(&lc, I, "Active reqs of %s: %lu. Pending pr is set to %lx",
300                                         vi->name, vi->active_reqs, pr);
301                         vi->pending_pr = pr;
302                         return 0;
303                 }
304                 else {
305                         XSEGLOG2(&lc, I, "No active reqs of %s. Pending pr is set to NULL",
306                                         vi->name);
307                         //assert vi->pending_pr == pr
308                         vi->pending_pr = NULL;
309                 }
310
311         }
312
313         vi->active_reqs++;
314
315         vio->err = 0; //reset error state
316
317         if (pr->req->op == X_WRITE && !pr->req->size &&
318                         (pr->req->flags & (XF_FLUSH|XF_FUA))){
319                 //hanlde flush requests here, so we don't mess with mapper
320                 //because of the -1 offset
321                 vi->flags &= ~VF_VOLUME_FREEZED;
322                 XSEGLOG2(&lc, I, "Completing flush request");
323                 pr->req->serviced = pr->req->size;
324                 conclude_pr(peer, pr);
325                 xqindex xqi;
326                 while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
327                                 (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
328                         struct peer_req *ppr = (struct peer_req *) xqi;
329                         do_accepted_pr(peer, ppr);
330                 }
331                 return 0;
332         }
333
334         vio->mreq = xseg_get_request(peer->xseg, pr->portno,
335                                         vlmc->mportno, X_ALLOC);
336         if (!vio->mreq)
337                 goto out_err;
338
339         /* use datalen 0. let mapper allocate buffer space as needed */
340         r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0);
341         if (r < 0) {
342                 XSEGLOG2(&lc, E, "Cannot prep request %lx, of pr %lx for volume %s",
343                                 vio->mreq, pr, vi->name);
344                 goto out_put;
345         }
346         mtarget = xseg_get_target(peer->xseg, vio->mreq);
347         if (!mtarget)
348                 goto out_put;
349
350         strncpy(mtarget, target, pr->req->targetlen);
351         vio->mreq->size = pr->req->size;
352         vio->mreq->offset = pr->req->offset;
353         vio->mreq->flags = 0;
354         switch (pr->req->op) {
355                 case X_READ: vio->mreq->op = X_MAPR; break;
356                 case X_WRITE: vio->mreq->op = X_MAPW; break;
357                 case X_INFO: vio->mreq->op = X_INFO; break;
358                 case X_CLOSE: vio->mreq->op = X_CLOSE; break;
359                 case X_OPEN: vio->mreq->op = X_OPEN; break;
360                 case X_SNAPSHOT: vio->mreq->op = X_SNAPSHOT; break;
361                 default: goto out_put;
362         }
363         xseg_set_req_data(peer->xseg, vio->mreq, pr);
364         __set_vio_state(vio, MAPPING);
365         p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
366         if (p == NoPort)
367                 goto out_unset;
368         r = xseg_signal(peer->xseg, p);
369         if (r < 0) {
370                 /* since submission is successful, just print a warning message */
371                 XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
372         }
373
374         XSEGLOG2(&lc, I, "Pr %lx of volume %s completed", pr, vi->name);
375         return 0;
376
377 out_unset:
378         xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
379 out_put:
380         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
381 out_err:
382         vio->err = 1;
383         XSEGLOG2(&lc, E, "Pr %lx of volume %s failed", pr, vi->name);
384         conclude_pr(peer, pr);
385         return -1;
386 }
387
388 static int append_to_pending_reqs(struct volume_info *vi, struct peer_req *pr)
389 {
390         XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s",
391                         pr, vi, vi->name);
392         if (!vi->pending_reqs){
393                 //allocate 8 as default. FIXME make it relevant to nr_ops;
394                 vi->pending_reqs = allocate_queue(8);
395         }
396
397         if (!vi->pending_reqs){
398                 XSEGLOG2(&lc, E, "Cannot allocate pending reqs queue for volume %s",
399                                 vi->name);
400                 XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
401                                 pr, vi, vi->name);
402                 return -1;
403         }
404
405         xqindex r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
406         if (r == Noneidx){
407                 if (doubleup_queue(vi) < 0){
408                         XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
409                                         pr, vi, vi->name);
410                         return -1;
411                 }
412                 r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
413         }
414
415         if (r == Noneidx){
416                 XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
417                                 pr, vi, vi->name);
418                 return -1;
419         }
420
421         XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s completed",
422                         pr, vi, vi->name);
423         return 0;
424 }
425
426 static int handle_accepted(struct peerd *peer, struct peer_req *pr,
427                                 struct xseg_request *req)
428 {
429         struct vlmc_io *vio = __get_vlmcio(pr);
430         struct vlmcd *vlmc = __get_vlmcd(peer);
431         char *target = xseg_get_target(peer->xseg, req);
432         struct volume_info *vi = find_volume_len(vlmc, target, req->targetlen);
433         XSEGLOG2(&lc, I, "Handle accepted for pr %lx, req %lx started", pr, req);
434         if (!vi){
435                 vi = malloc(sizeof(struct volume_info));
436                 if (!vi){
437                         vio->err = 1;
438                         conclude_pr(peer, pr);
439                         return -1;
440                 }
441                 strncpy(vi->name, target, req->targetlen);
442                 vi->name[req->targetlen] = 0;
443                 vi->flags = 0;
444                 vi->pending_pr = NULL;
445                 vi->active_reqs = 0;
446                 vi->pending_reqs = 0;
447                 if (insert_volume(vlmc, vi) < 0){
448                         vio->err = 1;
449                         conclude_pr(peer, pr);
450                         free(vi);
451                         return -1;
452                 }
453         }
454
455         if (vi->flags & VF_VOLUME_FREEZED){
456                 XSEGLOG2(&lc, I, "Volume %s (vi %lx) frozen. Appending to pending_reqs",
457                                 vi->name, vi);
458                 if (append_to_pending_reqs(vi, pr) < 0){
459                         vio->err = 1;
460                         conclude_pr(peer, pr);
461                         return -1;
462                 }
463                 return 0;
464         }
465
466         return do_accepted_pr(peer, pr);
467 }
468
469
470 static int mapping_info(struct peerd *peer, struct peer_req *pr)
471 {
472         struct vlmc_io *vio = __get_vlmcio(pr);
473         if (vio->mreq->state & XS_FAILED){
474                 XSEGLOG2(&lc, E, "Info req %lx failed",
475                                 (unsigned long)vio->mreq);
476                 vio->err = 1;
477         }
478         else {
479                 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)xseg_get_data(peer->xseg, vio->mreq);
480                 char *data = xseg_get_data(peer->xseg, pr->req);
481                 struct xseg_reply_info *xreply = (struct xseg_reply_info *)data;
482                 xreply->size = xinfo->size;
483         }
484         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
485         vio->mreq = NULL;
486         conclude_pr(peer, pr);
487         return 0;
488 }
489
490 static int mapping_open(struct peerd *peer, struct peer_req *pr)
491 {
492         struct vlmc_io *vio = __get_vlmcio(pr);
493         if (vio->mreq->state & XS_FAILED){
494                 XSEGLOG2(&lc, E, "Open req %lx failed",
495                                 (unsigned long)vio->mreq);
496                 vio->err = 1;
497         }
498         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
499         vio->mreq = NULL;
500         conclude_pr(peer, pr);
501         return 0;
502 }
503
504 static int mapping_close(struct peerd *peer, struct peer_req *pr)
505 {
506         struct vlmcd *vlmc = __get_vlmcd(peer);
507         struct vlmc_io *vio = __get_vlmcio(pr);
508         if (vio->mreq->state & XS_FAILED){
509                 XSEGLOG2(&lc, E, "Close req %lx failed",
510                                 (unsigned long)vio->mreq);
511                 vio->err = 1;
512         }
513         char *target = xseg_get_target(peer->xseg, pr->req);
514         struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
515
516         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
517         vio->mreq = NULL;
518         conclude_pr(peer, pr);
519
520         //assert active_reqs == 1
521         //assert volume freezed
522         //unfreeze
523         if (!vi){
524                 XSEGLOG2(&lc, E, "Volume has not volume info");
525                 return 0;
526         }
527         vi->flags &= ~ VF_VOLUME_FREEZED;
528         if (!vi->pending_reqs || !xq_count(vi->pending_reqs)){
529                 XSEGLOG2(&lc, I, "Volume %s (vi %lx) had no pending reqs. Removing",
530                                 vi->name, vi);
531                 if (vi->pending_reqs)
532                         xq_free(vi->pending_reqs);
533                 remove_volume(vlmc, vi);
534                 free(vi);
535         }
536         else {
537                 xqindex xqi;
538                 XSEGLOG2(&lc, I, "Volume %s (vi %lx) had pending reqs. Handling",
539                                 vi->name, vi);
540                 while (!(vi->flags & VF_VOLUME_FREEZED) &&
541                                 (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
542                         struct peer_req *ppr = (struct peer_req *) xqi;
543                         do_accepted_pr(peer, ppr);
544                 }
545                 XSEGLOG2(&lc, I, "Volume %s (vi %lx) handling pending reqs completed",
546                                 vi->name, vi);
547         }
548         return 0;
549 }
550
551 static int mapping_snapshot(struct peerd *peer, struct peer_req *pr)
552 {
553         struct vlmcd *vlmc = __get_vlmcd(peer);
554         struct vlmc_io *vio = __get_vlmcio(pr);
555         char *target = xseg_get_target(peer->xseg, pr->req);
556         struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
557         if (vio->mreq->state & XS_FAILED){
558                 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
559                                 (unsigned long)vio->mreq, vio->mreq->op);
560                 vio->err = 1;
561         }
562         else {
563                 struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, vio->mreq);
564                 char buf[XSEG_MAX_TARGETLEN];
565                 strncpy(buf, target, pr->req->targetlen);
566                 int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, sizeof(struct xseg_reply_snapshot));
567                 if (r < 0) {
568                         XSEGLOG2(&lc, E, "Cannot resize request");
569                         vio->err = 1;
570                 }
571                 else {
572                         target = xseg_get_target(peer->xseg, pr->req);
573                         strncpy(target, buf, pr->req->targetlen);
574                         char *data = xseg_get_data(peer->xseg, pr->req);
575                         struct xseg_reply_snapshot *xsnapshot = (struct xseg_reply_snapshot *) data;
576                         *xsnapshot = *xreply;
577                 }
578         }
579
580         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
581         vio->mreq = NULL;
582         conclude_pr(peer, pr);
583
584         //assert volume freezed
585         //unfreeze
586         if (!vi){
587                 XSEGLOG2(&lc, E, "Volume has no volume info");
588                 return 0;
589         }
590         XSEGLOG2(&lc, D, "Unfreezing volume %s", vi->name);
591         vi->flags &= ~ VF_VOLUME_FREEZED;
592
593         xqindex xqi;
594         while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
595                         (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
596                 struct peer_req *ppr = (struct peer_req *) xqi;
597                 do_accepted_pr(peer, ppr);
598         }
599         return 0;
600 }
601
602 static int mapping_readwrite(struct peerd *peer, struct peer_req *pr)
603 {
604         struct vlmcd *vlmc = __get_vlmcd(peer);
605         struct vlmc_io *vio = __get_vlmcio(pr);
606         struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
607         uint64_t pos, datalen, offset;
608         uint32_t targetlen;
609         struct xseg_request *breq;
610         char *target;
611         int i,r;
612         xport p;
613         if (vio->mreq->state & XS_FAILED){
614                 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
615                                 (unsigned long)vio->mreq, vio->mreq->op);
616                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
617                 vio->mreq = NULL;
618                 vio->err = 1;
619                 conclude_pr(peer, pr);
620                 return 0;
621         }
622
623         if (!mreply || !mreply->cnt){
624                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
625                 vio->mreq = NULL;
626                 vio->err = 1;
627                 conclude_pr(peer, pr);
628                 return -1;
629         }
630
631         vio->breq_len = mreply->cnt;
632         vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
633         if (!vio->breqs) {
634                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
635                 vio->mreq = NULL;
636                 vio->err = 1;
637                 conclude_pr(peer, pr);
638                 return -1;
639         }
640
641         pos = 0;
642         __set_vio_state(vio, SERVING);
643         for (i = 0; i < vio->breq_len; i++) {
644                 datalen = mreply->segs[i].size;
645                 offset = mreply->segs[i].offset;
646                 targetlen = mreply->segs[i].targetlen;
647                 breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
648                 if (!breq) {
649                         vio->err = 1;
650                         break;
651                 }
652                 r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
653                 if (r < 0) {
654                         vio->err = 1;
655                         xseg_put_request(peer->xseg, breq, pr->portno);
656                         break;
657                 }
658                 breq->offset = offset;
659                 breq->size = datalen;
660                 breq->op = pr->req->op;
661                 target = xseg_get_target(peer->xseg, breq);
662                 if (!target) {
663                         vio->err = 1;
664                         xseg_put_request(peer->xseg, breq, pr->portno);
665                         break;
666                 }
667                 strncpy(target, mreply->segs[i].target, targetlen);
668                 r = xseg_set_req_data(peer->xseg, breq, pr);
669                 if (r<0) {
670                         vio->err = 1;
671                         xseg_put_request(peer->xseg, breq, pr->portno);
672                         break;
673                 }
674
675                 // this should work, right ?
676                 breq->data = pr->req->data + pos;
677                 pos += datalen;
678                 p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
679                 if (p == NoPort){
680                         void *dummy;
681                         vio->err = 1;
682                         xseg_get_req_data(peer->xseg, breq, &dummy);
683                         xseg_put_request(peer->xseg, breq, pr->portno);
684                         break;
685                 }
686                 r = xseg_signal(peer->xseg, p);
687                 if (r < 0){
688                         XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
689                 }
690                 vio->breqs[i] = breq;
691         }
692         vio->breq_cnt = i;
693         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
694         vio->mreq = NULL;
695         if (i == 0) {
696                 free(vio->breqs);
697                 vio->breqs = NULL;
698                 vio->err = 1;
699                 conclude_pr(peer, pr);
700                 return -1;
701         }
702         return 0;
703 }
704
705 static int handle_mapping(struct peerd *peer, struct peer_req *pr,
706                                 struct xseg_request *req)
707 {
708         struct vlmc_io *vio = __get_vlmcio(pr);
709
710         //assert vio>mreq == req
711         if (vio->mreq != req){
712                 XSEGLOG2(&lc, E ,"vio->mreq %lx, req: %lx state: %d breq[0]: %lx",
713                                 (unsigned long)vio->mreq, (unsigned long)req,
714                                 vio->state, (unsigned long)vio->breqs[0]);
715                 return -1;
716         }
717
718         switch (vio->mreq->op){
719                 case X_INFO:
720                         mapping_info(peer, pr);
721                         break;
722                 case X_SNAPSHOT:
723                         mapping_snapshot(peer, pr);
724                         break;
725                 case X_CLOSE:
726                         mapping_close(peer, pr);
727                         break;
728                 case X_OPEN:
729                         mapping_open(peer, pr);
730                         break;
731                 case X_MAPR:
732                 case X_MAPW:
733                         mapping_readwrite(peer, pr);
734                         break;
735                 default:
736                         XSEGLOG2(&lc, W, "Invalid mreq op");
737                         //vio->err = 1;
738                         //conclude_pr(peer, pr);
739                         break;
740         }
741
742         return 0;
743 }
744
745 static int handle_serving(struct peerd *peer, struct peer_req *pr, 
746                                 struct xseg_request *req)
747 {
748         struct vlmc_io *vio = __get_vlmcio(pr);
749         struct vlmcd *vlmc = __get_vlmcd(peer);
750         (void)vlmc;
751         struct xseg_request *breq = req;
752
753         if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
754                 XSEGLOG2(&lc, E, "req %lx (op: %d) failed at offset %llu\n",
755                                 (unsigned long)req, req->op,
756                                 (unsigned long long)req->offset);
757                 vio->err = 1;
758         } else {
759                 //assert breq->serviced == breq->size
760                 pr->req->serviced += breq->serviced;
761         }
762         xseg_put_request(peer->xseg, breq, pr->portno);
763
764         if (!--vio->breq_cnt){
765                 __set_vio_state(vio, CONCLUDED);
766                 free(vio->breqs);
767                 vio->breqs = NULL;
768                 vio->breq_len = 0;
769                 conclude_pr(peer, pr);
770         }
771         return 0;
772 }
773
774 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
775                 enum dispatch_reason reason)
776 {
777         struct vlmc_io *vio = __get_vlmcio(pr);
778         struct vlmcd *vlmc = __get_vlmcd(peer);
779         (void)vlmc;
780
781         if (reason == dispatch_accept)
782                 //assert (pr->req == req)
783                 __set_vio_state(vio, ACCEPTED);
784
785         enum io_state_enum state = __get_vio_state(vio);
786         switch (state) {
787                 case ACCEPTED:
788                         handle_accepted(peer, pr, req);
789                         break;
790                 case MAPPING:
791                         handle_mapping(peer, pr, req);
792                         break;
793                 case SERVING:
794                         handle_serving(peer, pr, req);
795                         break;
796                 case CONCLUDED:
797                         XSEGLOG2(&lc, W, "invalid state. dispatch called for CONCLUDED");
798                         break;
799                 default:
800                         XSEGLOG2(&lc, E, "wtf dude? invalid state");
801                         break;
802         }
803         return 0;
804 }
805
806
807 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
808 {
809         struct vlmc_io *vio;
810         struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
811         int i, j;
812
813         if (!vlmc) {
814                 XSEGLOG2(&lc, E, "Cannot alloc vlmc");
815                 return -1;
816         }
817         peer->priv = (void *) vlmc;
818
819         vlmc->volumes = xhash_new(3, STRING);
820         if (!vlmc->volumes){
821                 XSEGLOG2(&lc, E, "Cannot alloc vlmc");
822                 return -1;
823         }
824         vlmc->mportno = NoPort;
825         vlmc->bportno = NoPort;
826
827         BEGIN_READ_ARGS(argc, argv);
828         READ_ARG_ULONG("-mp", vlmc->mportno);
829         READ_ARG_ULONG("-bp", vlmc->bportno);
830         END_READ_ARGS();
831
832         if (vlmc->bportno == NoPort) {
833                 XSEGLOG2(&lc, E, "bportno must be provided");
834                 usage(argv[0]);
835                 return -1;
836         }
837         if (vlmc->mportno == NoPort) {
838                 XSEGLOG2(&lc, E, "mportno must be provided");
839                 usage(argv[0]);
840                 return -1;
841         }
842
843         for (i = 0; i < peer->nr_ops; i++) {
844                 vio = malloc(sizeof(struct vlmc_io));
845                 if (!vio) {
846                         break;
847                 }
848                 vio->mreq = NULL;
849                 vio->breqs = NULL;
850                 vio->breq_cnt = 0;
851                 vio->breq_len = 0;
852                 xlock_release(&vio->lock);
853                 peer->peer_reqs[i].priv = (void *) vio;
854         }
855         if (i < peer->nr_ops) {
856                 for (j = 0; j < i; j++) {
857                         free(peer->peer_reqs[i].priv);
858                 }
859                 return -1;
860         }
861
862
863         const struct sched_param param = { .sched_priority = 99 };
864         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
865
866         return 0;
867 }
868
869 void custom_peer_finalize(struct peerd *peer)
870 {
871         return;
872 }