Complete unification of thread_loop/peerd_loop
[archipelago] / xseg / peers / user / mt-vlmcd.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <xseg/xseg.h>
39 #include <xseg/protocol.h>
40 #include <peer.h>
41 #include <sched.h>
42 #include <sys/syscall.h>
43
44 enum io_state_enum {
45         ACCEPTED = 0,
46         MAPPING = 1,
47         SERVING = 2,
48         CONCLUDED = 3
49 };
50
51 #define VF_VOLUME_FREEZED (1 << 0)
52
53 struct volume_info{
54         char name[XSEG_MAX_TARGETLEN + 1];
55         uint32_t flags;
56         uint32_t active_reqs;
57         struct xq *pending_reqs;
58         struct peer_req *pending_pr;
59 };
60
61 struct vlmcd {
62         xport mportno;
63         xport bportno;
64         xhash_t *volumes; //hash [volumename] -> struct volume_info
65 };
66
67 struct vlmc_io {
68         int err;
69         struct xlock lock;
70         volatile enum io_state_enum state;
71         struct xseg_request *mreq;
72         struct xseg_request **breqs;
73         unsigned long breq_len, breq_cnt;
74 };
75
76 void custom_peer_usage()
77 {
78         fprintf(stderr, "Custom peer options: \n"
79                         "-mp : mapper port\n"
80                         "-bp : blocker port for blocks\n"
81                         "\n");
82 }
83
84 static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
85 {
86         vio->state = state;
87 }
88
89 static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
90 {
91         enum io_state_enum state;
92         state = vio->state;
93         return state;
94 }
95
96 static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
97 {
98         return (struct vlmc_io *) pr->priv;
99 }
100
101 static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
102 {
103         return (struct vlmcd *) peer->priv;
104 }
105
106 static struct xq * allocate_queue(xqindex nr)
107 {
108         struct xq *q = malloc(sizeof(struct xq));
109         if (!q)
110                 return NULL;
111         if (!xq_alloc_empty(q, nr)){
112                 free(q);
113                 return NULL;
114         }
115         return q;
116 }
117
118 static int doubleup_queue(struct volume_info *vi)
119 {
120         //assert vi->pending_reqs
121         XSEGLOG2(&lc, D, "Doubling up queue of volume %s", vi->name);
122         struct xq *newq = allocate_queue(vi->pending_reqs->size * 2);
123         if (!newq){
124                 XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Allocation error",
125                                 vi->name);
126                 return -1;
127         }
128
129         if (__xq_resize(vi->pending_reqs, newq) == Noneidx){
130                 xq_free(newq);
131                 free(newq);
132                 XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Resize error",
133                                 vi->name);
134                 return -1;
135         }
136         xq_free(vi->pending_reqs);
137         free(vi->pending_reqs);
138         vi->pending_reqs = newq;
139         XSEGLOG2(&lc, D, "Doubling up queue of volume %s completed", vi->name);
140         return 0;
141 }
142
143 static struct volume_info * find_volume(struct vlmcd *vlmc, char *volume)
144 {
145         struct volume_info *vi = NULL;
146         XSEGLOG2(&lc, D, "looking up volume %s", volume);
147         int r = xhash_lookup(vlmc->volumes, (xhashidx) volume,
148                         (xhashidx *) &vi);
149         if (r < 0){
150                 XSEGLOG2(&lc, D, "looking up volume %s failed", volume);
151                 return NULL;
152         }
153         XSEGLOG2(&lc, D, "looking up volume %s completed. VI: %lx",
154                         volume, (unsigned long)vi);
155         return vi;
156 }
157
158 static struct volume_info * find_volume_len(struct vlmcd *vlmc, char *target,
159                                                 uint32_t targetlen)
160 {
161         char buf[XSEG_MAX_TARGETLEN+1];
162         strncpy(buf, target, targetlen);
163         buf[targetlen] = 0;
164         XSEGLOG2(&lc, D, "looking up volume %s, len %u",
165                         buf, targetlen);
166         return find_volume(vlmc, buf);
167
168 }
169
170 static int insert_volume(struct vlmcd *vlmc, struct volume_info *vi)
171 {
172         int r = -1;
173
174         if (find_volume(vlmc, vi->name)){
175                 XSEGLOG2(&lc, W, "Volume %s found in hash", vi->name);
176                 return r;
177         }
178
179         XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx)", 
180                         vi->name, strlen(vi->name), (unsigned long) vi);
181         r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
182         while (r == -XHASH_ERESIZE) {
183                 xhashidx shift = xhash_grow_size_shift(vlmc->volumes);
184                 xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
185                 if (!new_hashmap){
186                         XSEGLOG2(&lc, E, "Cannot grow vlmc->volumes to sizeshift %llu",
187                                         (unsigned long long) shift);
188                         return r;
189                 }
190                 vlmc->volumes = new_hashmap;
191                 r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
192         }
193         XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx) completed", 
194                         vi->name, strlen(vi->name), (unsigned long) vi);
195
196         return r;
197
198 }
199
200 static int remove_volume(struct vlmcd *vlmc, struct volume_info *vi)
201 {
202         int r = -1;
203
204         XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx)", 
205                         vi->name, strlen(vi->name), (unsigned long) vi);
206         r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
207         while (r == -XHASH_ERESIZE) {
208                 xhashidx shift = xhash_shrink_size_shift(vlmc->volumes);
209                 xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
210                 if (!new_hashmap){
211                         XSEGLOG2(&lc, E, "Cannot shrink vlmc->volumes to sizeshift %llu",
212                                         (unsigned long long) shift);
213                         XSEGLOG2(&lc, E, "Removing volume %s, (volume_info: %lx) failed", 
214                                         vi->name, (unsigned long) vi);
215                         return r;
216                 }
217                 vlmc->volumes = new_hashmap;
218                 r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
219         }
220         if (r < 0)
221                 XSEGLOG2(&lc, W, "Removing volume %s, len: %d (volume_info: %lx) failed", 
222                         vi->name, strlen(vi->name), (unsigned long) vi);
223         else
224                 XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx) completed", 
225                         vi->name, strlen(vi->name), (unsigned long) vi);
226         return r;
227 }
228
229 static int do_accepted_pr(struct peerd *peer, struct peer_req *pr);
230
231 static int conclude_pr(struct peerd *peer, struct peer_req *pr)
232 {
233         struct vlmcd *vlmc = __get_vlmcd(peer);
234         struct vlmc_io *vio = __get_vlmcio(pr);
235         char *target = xseg_get_target(peer->xseg, pr->req);
236         struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
237
238         XSEGLOG2(&lc, D, "Concluding pr %lx, req: %lx vi: %lx", pr, pr->req, vi);
239
240         __set_vio_state(vio, CONCLUDED);
241         if (vio->err)
242                 fail(peer, pr);
243         else
244                 complete(peer, pr);
245
246         if (vi){
247                 //assert vi->active_reqs > 0
248                 uint32_t ar = --vi->active_reqs;
249                 XSEGLOG2(&lc, D, "vi: %lx, volume name: %s, active_reqs: %lu, pending_pr: %lx",
250                                 vi, vi->name, ar, vi->pending_pr);
251                 if (!ar && vi->pending_pr)
252                         do_accepted_pr(peer, vi->pending_pr);
253         }
254         XSEGLOG2(&lc, D, "Concluded pr %lx, vi: %lx", pr, vi);
255         return 0;
256 }
257
258 static int do_accepted_pr(struct peerd *peer, struct peer_req *pr)
259 {
260         struct vlmcd *vlmc = __get_vlmcd(peer);
261         struct vlmc_io *vio = __get_vlmcio(pr);
262         int r;
263         xport p;
264         char *target, *mtarget;
265         void *dummy;
266
267         struct volume_info *vi;
268
269         XSEGLOG2(&lc, I, "Do accepted pr started for pr %lx", pr);
270         target = xseg_get_target(peer->xseg, pr->req);
271         if (!target){
272                 vio->err = 1;
273                 conclude_pr(peer, pr);
274                 return -1;
275         }
276
277         vi = find_volume_len(vlmc, target, pr->req->targetlen);
278         if (!vi){
279                 XSEGLOG2(&lc, E, "Cannot find volume");
280                 XSEGLOG2(&lc, E, "Pr %lx", pr);
281                 vio->err = 1;
282                 conclude_pr(peer, pr);
283                 return -1;
284         }
285
286         if (pr->req->op == X_CLOSE || pr->req->op == X_SNAPSHOT){
287                 XSEGLOG2(&lc, I, "Freezing volume %s", vi->name);
288                 vi->flags |= VF_VOLUME_FREEZED;
289                 if (vi->active_reqs){
290                         //assert vi->pending_pr == NULL;
291                         XSEGLOG2(&lc, I, "Active reqs of %s: %lu. Pending pr is set to %lx",
292                                         vi->name, vi->active_reqs, pr);
293                         vi->pending_pr = pr;
294                         return 0;
295                 }
296                 else {
297                         XSEGLOG2(&lc, I, "No active reqs of %s. Pending pr is set to NULL",
298                                         vi->name);
299                         //assert vi->pending_pr == pr
300                         vi->pending_pr = NULL;
301                 }
302         }
303
304         vi->active_reqs++;
305
306         vio->err = 0; //reset error state
307
308         if (pr->req->op == X_WRITE && !pr->req->size &&
309                         (pr->req->flags & (XF_FLUSH|XF_FUA))){
310                 //hanlde flush requests here, so we don't mess with mapper
311                 //because of the -1 offset
312                 XSEGLOG2(&lc, I, "Completing flush request");
313                 pr->req->serviced = pr->req->size;
314                 conclude_pr(peer, pr);
315                 return 0;
316         }
317
318         vio->mreq = xseg_get_request(peer->xseg, pr->portno,
319                                         vlmc->mportno, X_ALLOC);
320         if (!vio->mreq)
321                 goto out_err;
322
323         /* use datalen 0. let mapper allocate buffer space as needed */
324         r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0);
325         if (r < 0) {
326                 XSEGLOG2(&lc, E, "Cannot prep request %lx, of pr %lx for volume %s",
327                                 vio->mreq, pr, vi->name);
328                 goto out_put;
329         }
330         mtarget = xseg_get_target(peer->xseg, vio->mreq);
331         if (!mtarget)
332                 goto out_put;
333
334         strncpy(mtarget, target, pr->req->targetlen);
335         vio->mreq->size = pr->req->size;
336         vio->mreq->offset = pr->req->offset;
337         vio->mreq->flags = 0;
338         switch (pr->req->op) {
339                 case X_READ: vio->mreq->op = X_MAPR; break;
340                 case X_WRITE: vio->mreq->op = X_MAPW; break;
341                 case X_INFO: vio->mreq->op = X_INFO; break;
342                 case X_CLOSE: vio->mreq->op = X_CLOSE; break;
343                 case X_OPEN: vio->mreq->op = X_OPEN; break;
344                 case X_SNAPSHOT: vio->mreq->op = X_SNAPSHOT; break;
345                 default: goto out_put;
346         }
347         xseg_set_req_data(peer->xseg, vio->mreq, pr);
348         __set_vio_state(vio, MAPPING);
349         p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
350         if (p == NoPort)
351                 goto out_unset;
352         r = xseg_signal(peer->xseg, p);
353         if (r < 0) {
354                 /* since submission is successful, just print a warning message */
355                 XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
356         }
357
358         XSEGLOG2(&lc, I, "Pr %lx of volume %s completed", pr, vi->name);
359         return 0;
360
361 out_unset:
362         xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
363 out_put:
364         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
365 out_err:
366         vio->err = 1;
367         XSEGLOG2(&lc, E, "Pr %lx of volume %s failed", pr, vi->name);
368         conclude_pr(peer, pr);
369         return -1;
370 }
371
372 static int append_to_pending_reqs(struct volume_info *vi, struct peer_req *pr)
373 {
374         XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s",
375                         pr, vi, vi->name);
376         if (!vi->pending_reqs){
377                 //allocate 8 as default. FIXME make it relevant to nr_ops;
378                 vi->pending_reqs = allocate_queue(8);
379         }
380
381         if (!vi->pending_reqs){
382                 XSEGLOG2(&lc, E, "Cannot allocate pending reqs queue for volume %s",
383                                 vi->name);
384                 XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
385                                 pr, vi, vi->name);
386                 return -1;
387         }
388
389         xqindex r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
390         if (r == Noneidx){
391                 if (doubleup_queue(vi) < 0){
392                         XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
393                                         pr, vi, vi->name);
394                         return -1;
395                 }
396                 r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
397         }
398
399         if (r == Noneidx){
400                 XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
401                                 pr, vi, vi->name);
402                 return -1;
403         }
404
405         XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s completed",
406                         pr, vi, vi->name);
407         return 0;
408 }
409
410 static int handle_accepted(struct peerd *peer, struct peer_req *pr,
411                                 struct xseg_request *req)
412 {
413         struct vlmc_io *vio = __get_vlmcio(pr);
414         struct vlmcd *vlmc = __get_vlmcd(peer);
415         char *target = xseg_get_target(peer->xseg, req);
416         struct volume_info *vi = find_volume_len(vlmc, target, req->targetlen);
417         XSEGLOG2(&lc, I, "Handle accepted for pr %lx, req %lx started", pr, req);
418         if (!vi){
419                 vi = malloc(sizeof(struct volume_info));
420                 if (!vi){
421                         vio->err = 1;
422                         conclude_pr(peer, pr);
423                         return -1;
424                 }
425                 strncpy(vi->name, target, req->targetlen);
426                 vi->name[req->targetlen] = 0;
427                 vi->flags = 0;
428                 vi->pending_pr = NULL;
429                 vi->active_reqs = 0;
430                 vi->pending_reqs = 0;
431                 if (insert_volume(vlmc, vi) < 0){
432                         vio->err = 1;
433                         conclude_pr(peer, pr);
434                         free(vi);
435                         return -1;
436                 }
437         }
438
439         if (vi->flags & VF_VOLUME_FREEZED){
440                 XSEGLOG2(&lc, I, "Volume %s (vi %lx) frozen. Appending to pending_reqs",
441                                 vi->name, vi);
442                 if (append_to_pending_reqs(vi, pr) < 0){
443                         vio->err = 1;
444                         conclude_pr(peer, pr);
445                         return -1;
446                 }
447                 return 0;
448         }
449
450         return do_accepted_pr(peer, pr);
451 }
452
453
454 static int mapping_info(struct peerd *peer, struct peer_req *pr)
455 {
456         struct vlmc_io *vio = __get_vlmcio(pr);
457         if (vio->mreq->state & XS_FAILED){
458                 XSEGLOG2(&lc, E, "Info req %lx failed",
459                                 (unsigned long)vio->mreq);
460                 vio->err = 1;
461         }
462         else {
463                 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)xseg_get_data(peer->xseg, vio->mreq);
464                 char *data = xseg_get_data(peer->xseg, pr->req);
465                 struct xseg_reply_info *xreply = (struct xseg_reply_info *)data;
466                 xreply->size = xinfo->size;
467         }
468         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
469         vio->mreq = NULL;
470         conclude_pr(peer, pr);
471         return 0;
472 }
473
474 static int mapping_open(struct peerd *peer, struct peer_req *pr)
475 {
476         struct vlmc_io *vio = __get_vlmcio(pr);
477         if (vio->mreq->state & XS_FAILED){
478                 XSEGLOG2(&lc, E, "Open req %lx failed",
479                                 (unsigned long)vio->mreq);
480                 vio->err = 1;
481         }
482         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
483         vio->mreq = NULL;
484         conclude_pr(peer, pr);
485         return 0;
486 }
487
488 static int mapping_close(struct peerd *peer, struct peer_req *pr)
489 {
490         struct vlmcd *vlmc = __get_vlmcd(peer);
491         struct vlmc_io *vio = __get_vlmcio(pr);
492         if (vio->mreq->state & XS_FAILED){
493                 XSEGLOG2(&lc, E, "Close req %lx failed",
494                                 (unsigned long)vio->mreq);
495                 vio->err = 1;
496         }
497         char *target = xseg_get_target(peer->xseg, pr->req);
498         struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
499
500         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
501         vio->mreq = NULL;
502         conclude_pr(peer, pr);
503
504         //assert active_reqs == 1
505         //assert volume freezed
506         //unfreeze
507         if (!vi){
508                 XSEGLOG2(&lc, E, "Volume has not volume info");
509                 return 0;
510         }
511         vi->flags &= ~ VF_VOLUME_FREEZED;
512         if (!vi->pending_reqs || !xq_count(vi->pending_reqs)){
513                 XSEGLOG2(&lc, I, "Volume %s (vi %lx) had no pending reqs. Removing",
514                                 vi->name, vi);
515                 if (vi->pending_reqs)
516                         xq_free(vi->pending_reqs);
517                 remove_volume(vlmc, vi);
518                 free(vi);
519         }
520         else {
521                 xqindex xqi;
522                 XSEGLOG2(&lc, I, "Volume %s (vi %lx) had pending reqs. Handling",
523                                 vi->name, vi);
524                 while (!(vi->flags & VF_VOLUME_FREEZED) &&
525                                 (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
526                         struct peer_req *ppr = (struct peer_req *) xqi;
527                         do_accepted_pr(peer, ppr);
528                 }
529                 XSEGLOG2(&lc, I, "Volume %s (vi %lx) handling pending reqs completed",
530                                 vi->name, vi);
531         }
532         return 0;
533 }
534
535 static int mapping_snapshot(struct peerd *peer, struct peer_req *pr)
536 {
537         struct vlmcd *vlmc = __get_vlmcd(peer);
538         struct vlmc_io *vio = __get_vlmcio(pr);
539         char *target = xseg_get_target(peer->xseg, pr->req);
540         struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
541         if (vio->mreq->state & XS_FAILED){
542                 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
543                                 (unsigned long)vio->mreq, vio->mreq->op);
544                 vio->err = 1;
545         }
546         else {
547                 struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, vio->mreq);
548                 char buf[XSEG_MAX_TARGETLEN];
549                 strncpy(buf, target, pr->req->targetlen);
550                 int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, sizeof(struct xseg_reply_snapshot));
551                 if (r < 0) {
552                         XSEGLOG2(&lc, E, "Cannot resize request");
553                         vio->err = 1;
554                 }
555                 else {
556                         target = xseg_get_target(peer->xseg, pr->req);
557                         strncpy(target, buf, pr->req->targetlen);
558                         char *data = xseg_get_data(peer->xseg, pr->req);
559                         struct xseg_reply_snapshot *xsnapshot = (struct xseg_reply_snapshot *) data;
560                         *xsnapshot = *xreply;
561                 }
562         }
563
564         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
565         vio->mreq = NULL;
566         conclude_pr(peer, pr);
567
568         //assert volume freezed
569         //unfreeze
570         if (!vi){
571                 XSEGLOG2(&lc, E, "Volume has no volume info");
572                 return 0;
573         }
574         XSEGLOG2(&lc, D, "Unfreezing volume %s", vi->name);
575         vi->flags &= ~ VF_VOLUME_FREEZED;
576
577         xqindex xqi;
578         while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
579                         (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
580                 struct peer_req *ppr = (struct peer_req *) xqi;
581                 do_accepted_pr(peer, ppr);
582         }
583         return 0;
584 }
585
586 static int mapping_readwrite(struct peerd *peer, struct peer_req *pr)
587 {
588         struct vlmcd *vlmc = __get_vlmcd(peer);
589         struct vlmc_io *vio = __get_vlmcio(pr);
590         struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
591         uint64_t pos, datalen, offset;
592         uint32_t targetlen;
593         struct xseg_request *breq;
594         char *target;
595         int i,r;
596         xport p;
597         if (vio->mreq->state & XS_FAILED){
598                 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
599                                 (unsigned long)vio->mreq, vio->mreq->op);
600                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
601                 vio->mreq = NULL;
602                 vio->err = 1;
603                 conclude_pr(peer, pr);
604                 return 0;
605         }
606
607         if (!mreply || !mreply->cnt){
608                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
609                 vio->mreq = NULL;
610                 vio->err = 1;
611                 conclude_pr(peer, pr);
612                 return -1;
613         }
614
615         vio->breq_len = mreply->cnt;
616         vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
617         if (!vio->breqs) {
618                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
619                 vio->mreq = NULL;
620                 vio->err = 1;
621                 conclude_pr(peer, pr);
622                 return -1;
623         }
624
625         pos = 0;
626         __set_vio_state(vio, SERVING);
627         for (i = 0; i < vio->breq_len; i++) {
628                 datalen = mreply->segs[i].size;
629                 offset = mreply->segs[i].offset;
630                 targetlen = mreply->segs[i].targetlen;
631                 breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
632                 if (!breq) {
633                         vio->err = 1;
634                         break;
635                 }
636                 r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
637                 if (r < 0) {
638                         vio->err = 1;
639                         xseg_put_request(peer->xseg, breq, pr->portno);
640                         break;
641                 }
642                 breq->offset = offset;
643                 breq->size = datalen;
644                 breq->op = pr->req->op;
645                 target = xseg_get_target(peer->xseg, breq);
646                 if (!target) {
647                         vio->err = 1;
648                         xseg_put_request(peer->xseg, breq, pr->portno);
649                         break;
650                 }
651                 strncpy(target, mreply->segs[i].target, targetlen);
652                 r = xseg_set_req_data(peer->xseg, breq, pr);
653                 if (r<0) {
654                         vio->err = 1;
655                         xseg_put_request(peer->xseg, breq, pr->portno);
656                         break;
657                 }
658
659                 // this should work, right ?
660                 breq->data = pr->req->data + pos;
661                 pos += datalen;
662                 p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
663                 if (p == NoPort){
664                         void *dummy;
665                         vio->err = 1;
666                         xseg_get_req_data(peer->xseg, breq, &dummy);
667                         xseg_put_request(peer->xseg, breq, pr->portno);
668                         break;
669                 }
670                 r = xseg_signal(peer->xseg, p);
671                 if (r < 0){
672                         XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
673                 }
674                 vio->breqs[i] = breq;
675         }
676         vio->breq_cnt = i;
677         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
678         vio->mreq = NULL;
679         if (i == 0) {
680                 free(vio->breqs);
681                 vio->breqs = NULL;
682                 vio->err = 1;
683                 conclude_pr(peer, pr);
684                 return -1;
685         }
686         return 0;
687 }
688
689 static int handle_mapping(struct peerd *peer, struct peer_req *pr,
690                                 struct xseg_request *req)
691 {
692         struct vlmc_io *vio = __get_vlmcio(pr);
693
694         //assert vio>mreq == req
695         if (vio->mreq != req){
696                 XSEGLOG2(&lc, E ,"vio->mreq %lx, req: %lx state: %d breq[0]: %lx",
697                                 (unsigned long)vio->mreq, (unsigned long)req,
698                                 vio->state, (unsigned long)vio->breqs[0]);
699                 return -1;
700         }
701
702         switch (vio->mreq->op){
703                 case X_INFO:
704                         mapping_info(peer, pr);
705                         break;
706                 case X_SNAPSHOT:
707                         mapping_snapshot(peer, pr);
708                         break;
709                 case X_CLOSE:
710                         mapping_close(peer, pr);
711                         break;
712                 case X_OPEN:
713                         mapping_open(peer, pr);
714                         break;
715                 case X_MAPR:
716                 case X_MAPW:
717                         mapping_readwrite(peer, pr);
718                         break;
719                 default:
720                         XSEGLOG2(&lc, W, "Invalid mreq op");
721                         //vio->err = 1;
722                         //conclude_pr(peer, pr);
723                         break;
724         }
725
726         return 0;
727 }
728
729 static int handle_serving(struct peerd *peer, struct peer_req *pr, 
730                                 struct xseg_request *req)
731 {
732         struct vlmc_io *vio = __get_vlmcio(pr);
733         struct vlmcd *vlmc = __get_vlmcd(peer);
734         (void)vlmc;
735         struct xseg_request *breq = req;
736
737         if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
738                 XSEGLOG2(&lc, E, "req %lx (op: %d) failed at offset %llu\n",
739                                 (unsigned long)req, req->op,
740                                 (unsigned long long)req->offset);
741                 vio->err = 1;
742         } else {
743                 //assert breq->serviced == breq->size
744                 pr->req->serviced += breq->serviced;
745         }
746         xseg_put_request(peer->xseg, breq, pr->portno);
747
748         if (!--vio->breq_cnt){
749                 __set_vio_state(vio, CONCLUDED);
750                 free(vio->breqs);
751                 vio->breqs = NULL;
752                 vio->breq_len = 0;
753                 conclude_pr(peer, pr);
754         }
755         return 0;
756 }
757
758 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
759                 enum dispatch_reason reason)
760 {
761         struct vlmc_io *vio = __get_vlmcio(pr);
762         struct vlmcd *vlmc = __get_vlmcd(peer);
763         (void)vlmc;
764
765         if (reason == dispatch_accept)
766                 //assert (pr->req == req)
767                 __set_vio_state(vio, ACCEPTED);
768
769         enum io_state_enum state = __get_vio_state(vio);
770         switch (state) {
771                 case ACCEPTED:
772                         handle_accepted(peer, pr, req);
773                         break;
774                 case MAPPING:
775                         handle_mapping(peer, pr, req);
776                         break;
777                 case SERVING:
778                         handle_serving(peer, pr, req);
779                         break;
780                 case CONCLUDED:
781                         XSEGLOG2(&lc, W, "invalid state. dispatch called for CONCLUDED");
782                         break;
783                 default:
784                         XSEGLOG2(&lc, E, "wtf dude? invalid state");
785                         break;
786         }
787         return 0;
788 }
789
790
791 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
792 {
793         struct vlmc_io *vio;
794         struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
795         int i, j;
796
797         if (!vlmc) {
798                 XSEGLOG2(&lc, E, "Cannot alloc vlmc");
799                 return -1;
800         }
801         peer->priv = (void *) vlmc;
802
803         vlmc->volumes = xhash_new(3, STRING);
804         if (!vlmc->volumes){
805                 XSEGLOG2(&lc, E, "Cannot alloc vlmc");
806                 return -1;
807         }
808         vlmc->mportno = NoPort;
809         vlmc->bportno = NoPort;
810
811         BEGIN_READ_ARGS(argc, argv);
812         READ_ARG_ULONG("-mp", vlmc->mportno);
813         READ_ARG_ULONG("-bp", vlmc->bportno);
814         END_READ_ARGS();
815
816         if (vlmc->bportno == NoPort) {
817                 XSEGLOG2(&lc, E, "bportno must be provided");
818                 usage(argv[0]);
819                 return -1;
820         }
821         if (vlmc->mportno == NoPort) {
822                 XSEGLOG2(&lc, E, "mportno must be provided");
823                 usage(argv[0]);
824                 return -1;
825         }
826
827         for (i = 0; i < peer->nr_ops; i++) {
828                 vio = malloc(sizeof(struct vlmc_io));
829                 if (!vio) {
830                         break;
831                 }
832                 vio->mreq = NULL;
833                 vio->breqs = NULL;
834                 vio->breq_cnt = 0;
835                 vio->breq_len = 0;
836                 xlock_release(&vio->lock);
837                 peer->peer_reqs[i].priv = (void *) vio;
838         }
839         if (i < peer->nr_ops) {
840                 for (j = 0; j < i; j++) {
841                         free(peer->peer_reqs[i].priv);
842                 }
843                 return -1;
844         }
845
846
847         const struct sched_param param = { .sched_priority = 99 };
848         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
849
850         return 0;
851 }
852
853 void custom_peer_finalize(struct peerd *peer)
854 {
855         return;
856 }