d29306e2a83bff0d25336a1ca457191b97f4754f
[archipelago] / xseg / peers / user / mt-vlmcd.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <unistd.h>
38 #include <xseg/xseg.h>
39 #include <xseg/protocol.h>
40 #include <peer.h>
41 #include <sched.h>
42 #include <sys/syscall.h>
43
44 enum io_state_enum {
45         ACCEPTED = 0,
46         MAPPING = 1,
47         SERVING = 2,
48         CONCLUDED = 3
49 };
50
51 #define VF_VOLUME_FREEZED (1 << 0)
52
53 struct volume_info{
54         char name[XSEG_MAX_TARGETLEN + 1];
55         uint32_t flags;
56         uint32_t active_reqs;
57         struct xq *pending_reqs;
58         struct peer_req *pending_pr;
59 };
60
61 struct vlmcd {
62         xport mportno;
63         xport bportno;
64         xhash_t *volumes; //hash [volumename] -> struct volume_info
65 };
66
67 struct vlmc_io {
68         int err;
69         struct xlock lock;
70         volatile enum io_state_enum state;
71         struct xseg_request *mreq;
72         struct xseg_request **breqs;
73         unsigned long breq_len, breq_cnt;
74 };
75
76 void custom_peer_usage()
77 {
78         fprintf(stderr, "Custom peer options: \n"
79                         "-mp : mapper port\n"
80                         "-bp : blocker port for blocks\n"
81                         "\n");
82 }
83
84 static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
85 {
86         vio->state = state;
87 }
88
89 static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
90 {
91         enum io_state_enum state;
92         state = vio->state;
93         return state;
94 }
95
96 static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
97 {
98         return (struct vlmc_io *) pr->priv;
99 }
100
101 static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
102 {
103         return (struct vlmcd *) peer->priv;
104 }
105
106 static struct xq * allocate_queue(xqindex nr)
107 {
108         struct xq *q = malloc(sizeof(struct xq));
109         if (!q)
110                 return NULL;
111         if (!xq_alloc_empty(q, nr)){
112                 free(q);
113                 return NULL;
114         }
115         return q;
116 }
117
118 static int doubleup_queue(struct volume_info *vi)
119 {
120         //assert vi->pending_reqs
121         struct xq *newq = allocate_queue(vi->pending_reqs->size * 2);
122         if (!newq)
123                 return -1;
124
125         if (__xq_resize(vi->pending_reqs, newq) == Noneidx){
126                 xq_free(newq);
127                 free(newq);
128                 return -1;
129         }
130         xq_free(vi->pending_reqs);
131         free(vi->pending_reqs);
132         vi->pending_reqs = newq;
133         return 0;
134 }
135
136 static struct volume_info * find_volume(struct vlmcd *vlmc, char *volume)
137 {
138         struct volume_info *vi = NULL;
139         int r = xhash_lookup(vlmc->volumes, (xhashidx) volume,
140                         (xhashidx *) &vi);
141         if (r < 0)
142                 return NULL;
143         return vi;
144 }
145
146 static struct volume_info * find_volume_len(struct vlmcd *vlmc, char *target,
147                                                 uint32_t targetlen)
148 {
149         char buf[XSEG_MAX_TARGETLEN+1];
150         strncpy(buf, target, targetlen);
151         buf[targetlen] = 0;
152         XSEGLOG2(&lc, D, "looking up volume %s, len %u",
153                         buf, targetlen);
154         return find_volume(vlmc, buf);
155
156 }
157
158 static int insert_volume(struct vlmcd *vlmc, struct volume_info *vi)
159 {
160         int r = -1;
161
162         if (find_volume(vlmc, vi->name)){
163                 XSEGLOG2(&lc, W, "Volume %s found in hash", vi->name);
164                 return r;
165         }
166
167         XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx)", 
168                         vi->name, strlen(vi->name), (unsigned long) vi);
169         r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
170         while (r == -XHASH_ERESIZE) {
171                 xhashidx shift = xhash_grow_size_shift(vlmc->volumes);
172                 xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
173                 if (!new_hashmap){
174                         XSEGLOG2(&lc, E, "Cannot grow vlmc->volumes to sizeshift %llu",
175                                         (unsigned long long) shift);
176                         return r;
177                 }
178                 vlmc->volumes = new_hashmap;
179                 r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
180         }
181
182         return r;
183
184 }
185
186 static int remove_volume(struct vlmcd *vlmc, struct volume_info *vi)
187 {
188         int r = -1;
189
190         r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
191         while (r == -XHASH_ERESIZE) {
192                 xhashidx shift = xhash_shrink_size_shift(vlmc->volumes);
193                 xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
194                 if (!new_hashmap){
195                         XSEGLOG2(&lc, E, "Cannot shrink vlmc->volumes to sizeshift %llu",
196                                         (unsigned long long) shift);
197                         return r;
198                 }
199                 vlmc->volumes = new_hashmap;
200                 r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
201         }
202
203         return r;
204 }
205
206 static int do_accepted_pr(struct peerd *peer, struct peer_req *pr);
207
208 static int conclude_pr(struct peerd *peer, struct peer_req *pr)
209 {
210         struct vlmcd *vlmc = __get_vlmcd(peer);
211         struct vlmc_io *vio = __get_vlmcio(pr);
212         char *target = xseg_get_target(peer->xseg, pr->req);
213         struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
214
215         __set_vio_state(vio, CONCLUDED);
216         if (vio->err)
217                 fail(peer, pr);
218         else
219                 complete(peer, pr);
220
221         if (vi){
222                 //assert vi->active_reqs > 0
223                 uint32_t ar = --vi->active_reqs;
224                 if (!ar && vi->pending_pr)
225                         do_accepted_pr(peer, vi->pending_pr);
226         }
227         return 0;
228 }
229
230 static int do_accepted_pr(struct peerd *peer, struct peer_req *pr)
231 {
232         struct vlmcd *vlmc = __get_vlmcd(peer);
233         struct vlmc_io *vio = __get_vlmcio(pr);
234         int r;
235         xport p;
236         char *target, *mtarget;
237         void *dummy;
238
239         struct volume_info *vi;
240
241         target = xseg_get_target(peer->xseg, pr->req);
242         if (!target)
243                 goto out_err;
244
245         vi = find_volume_len(vlmc, target, pr->req->targetlen);
246         if (!vi){
247                 XSEGLOG2(&lc, E, "Cannot find volume");
248                 goto out_err;
249         }
250
251         if (pr->req->op == X_CLOSE || pr->req->op == X_SNAPSHOT){
252                 vi->flags |= VF_VOLUME_FREEZED;
253                 if (vi->active_reqs){
254                         //assert vi->pending_pr == NULL;
255                         vi->pending_pr = pr;
256                         return 0;
257                 }
258                 else {
259                         //assert vi->pending_pr == pr
260                         vi->pending_pr = NULL;
261                 }
262         }
263
264         vi->active_reqs++;
265
266         vio->err = 0; //reset error state
267
268         if (pr->req->op == X_WRITE && !pr->req->size &&
269                         (pr->req->flags & (XF_FLUSH|XF_FUA))){
270                 //hanlde flush requests here, so we don't mess with mapper
271                 //because of the -1 offset
272                 XSEGLOG2(&lc, I, "Completing flush request");
273                 pr->req->serviced = pr->req->size;
274                 conclude_pr(peer, pr);
275                 return 0;
276         }
277
278         vio->mreq = xseg_get_request(peer->xseg, pr->portno,
279                                         vlmc->mportno, X_ALLOC);
280         if (!vio->mreq)
281                 goto out_err;
282
283         /* use datalen 0. let mapper allocate buffer space as needed */
284         r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0);
285         if (r < 0) {
286                 goto out_put;
287         }
288         mtarget = xseg_get_target(peer->xseg, vio->mreq);
289         if (!mtarget)
290                 goto out_put;
291
292         strncpy(mtarget, target, pr->req->targetlen);
293         vio->mreq->size = pr->req->size;
294         vio->mreq->offset = pr->req->offset;
295         vio->mreq->flags = 0;
296         switch (pr->req->op) {
297                 case X_READ: vio->mreq->op = X_MAPR; break;
298                 case X_WRITE: vio->mreq->op = X_MAPW; break;
299                 case X_INFO: vio->mreq->op = X_INFO; break;
300                 case X_CLOSE: vio->mreq->op = X_CLOSE; break;
301                 case X_OPEN: vio->mreq->op = X_OPEN; break;
302                 case X_SNAPSHOT: vio->mreq->op = X_SNAPSHOT; break;
303                 default: goto out_put;
304         }
305         xseg_set_req_data(peer->xseg, vio->mreq, pr);
306         __set_vio_state(vio, MAPPING);
307         p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
308         if (p == NoPort)
309                 goto out_unset;
310         r = xseg_signal(peer->xseg, p);
311         if (r < 0) {
312                 /* since submission is successful, just print a warning message */
313                 XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
314         }
315
316         return 0;
317
318 out_unset:
319         xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
320 out_put:
321         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
322 out_err:
323         vio->err = 1;
324         conclude_pr(peer, pr);
325         return -1;
326 }
327
328 static int append_to_pending_reqs(struct volume_info *vi, struct peer_req *pr)
329 {
330         if (!vi->pending_reqs){
331                 //allocate 8 as default. FIXME make it relevant to nr_ops;
332                 vi->pending_reqs = allocate_queue(8);
333         }
334
335         if (!vi->pending_reqs){
336                 XSEGLOG2(&lc, E, "Cannot allocate pending reqs queue for volume %s",
337                                 vi->name);
338                 return -1;
339         }
340
341         xqindex r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
342         if (r == Noneidx){
343                 if (doubleup_queue(vi) < 0)
344                         return -1;
345                 r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
346         }
347
348         if (r == Noneidx)
349                 return -1;
350
351         return 0;
352 }
353
354 static int handle_accepted(struct peerd *peer, struct peer_req *pr,
355                                 struct xseg_request *req)
356 {
357         struct vlmc_io *vio = __get_vlmcio(pr);
358         struct vlmcd *vlmc = __get_vlmcd(peer);
359         char *target = xseg_get_target(peer->xseg, req);
360         struct volume_info *vi = find_volume_len(vlmc, target, req->targetlen); 
361         if (!vi){
362                 vi = malloc(sizeof(struct volume_info));
363                 if (!vi){
364                         vio->err = 1;
365                         conclude_pr(peer, pr);
366                         return -1;
367                 }
368                 strncpy(vi->name, target, req->targetlen);
369                 vi->name[req->targetlen] = 0;
370                 vi->flags = 0;
371                 vi->pending_pr = NULL;
372                 vi->active_reqs = 0;
373                 vi->pending_reqs = 0;
374                 if (insert_volume(vlmc, vi) < 0){
375                         vio->err = 1;
376                         conclude_pr(peer, pr);
377                         free(vi);
378                         return -1;
379                 }
380         }
381
382         if (vi->flags & VF_VOLUME_FREEZED){
383                 if (append_to_pending_reqs(vi, pr) < 0){
384                         vio->err = 1;
385                         conclude_pr(peer, pr);
386                         return -1;
387                 }
388                 return 0;
389         }
390
391         return do_accepted_pr(peer, pr);
392 }
393
394
395 static int mapping_info(struct peerd *peer, struct peer_req *pr)
396 {
397         struct vlmc_io *vio = __get_vlmcio(pr);
398         if (vio->mreq->state & XS_FAILED){
399                 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
400                                 (unsigned long)vio->mreq, vio->mreq->op);
401                 vio->err = 1;
402         }
403         else {
404                 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
405                 char *data = xseg_get_data(peer->xseg, pr->req);
406                 *(uint64_t *)data = xinfo->size;
407         }
408         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
409         vio->mreq = NULL;
410         conclude_pr(peer, pr);
411         return 0;
412 }
413
414 static int mapping_open(struct peerd *peer, struct peer_req *pr)
415 {
416         struct vlmc_io *vio = __get_vlmcio(pr);
417         if (vio->mreq->state & XS_FAILED){
418                 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
419                                 (unsigned long)vio->mreq, vio->mreq->op);
420                 vio->err = 1;
421         }
422         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
423         vio->mreq = NULL;
424         conclude_pr(peer, pr);
425         return 0;
426 }
427
428 static int mapping_close(struct peerd *peer, struct peer_req *pr)
429 {
430         struct vlmcd *vlmc = __get_vlmcd(peer);
431         struct vlmc_io *vio = __get_vlmcio(pr);
432         if (vio->mreq->state & XS_FAILED){
433                 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
434                                 (unsigned long)vio->mreq, vio->mreq->op);
435                 vio->err = 1;
436         }
437         char *target = xseg_get_target(peer->xseg, pr->req);
438         struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
439
440         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
441         vio->mreq = NULL;
442         conclude_pr(peer, pr);
443
444         //assert active_reqs == 1
445         //assert volume freezed
446         //unfreeze
447         if (!vi){
448                 XSEGLOG2(&lc, E, "Volume has not volume info");
449                 return 0;
450         }
451         vi->flags &= ~ VF_VOLUME_FREEZED;
452         if (!vi->pending_reqs || !xq_count(vi->pending_reqs)){
453                 if (vi->pending_reqs)
454                         xq_free(vi->pending_reqs);
455                 remove_volume(vlmc, vi);
456                 free(vi);
457         }
458         else {
459                 xqindex xqi;
460                 while (!(vi->flags & VF_VOLUME_FREEZED) &&
461                                 (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
462                         struct peer_req *ppr = (struct peer_req *) xqi;
463                         do_accepted_pr(peer, ppr);
464                 }
465         }
466         return 0;
467 }
468
469 static int mapping_snapshot(struct peerd *peer, struct peer_req *pr)
470 {
471         struct vlmcd *vlmc = __get_vlmcd(peer);
472         struct vlmc_io *vio = __get_vlmcio(pr);
473         char *target = xseg_get_target(peer->xseg, pr->req);
474         struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
475         if (vio->mreq->state & XS_FAILED){
476                 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
477                                 (unsigned long)vio->mreq, vio->mreq->op);
478                 vio->err = 1;
479         }
480         else {
481                 struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, vio->mreq);
482                 char buf[XSEG_MAX_TARGETLEN];
483                 strncpy(buf, target, pr->req->targetlen);
484                 int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, sizeof(struct xseg_reply_snapshot));
485                 if (r < 0) {
486                         XSEGLOG2(&lc, E, "Cannot resize request");
487                         vio->err = 1;
488                 }
489                 else {
490                         target = xseg_get_target(peer->xseg, pr->req);
491                         strncpy(target, buf, pr->req->targetlen);
492                         char *data = xseg_get_data(peer->xseg, pr->req);
493                         struct xseg_reply_snapshot *xsnapshot = (struct xseg_reply_snapshot *) data;
494                         *xsnapshot = *xreply;
495                 }
496         }
497
498         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
499         vio->mreq = NULL;
500         conclude_pr(peer, pr);
501
502         //assert volume freezed
503         //unfreeze
504         if (!vi){
505                 XSEGLOG2(&lc, E, "Volume has no volume info");
506                 return 0;
507         }
508
509         vi->flags &= ~ VF_VOLUME_FREEZED;
510
511         xqindex xqi;
512         while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
513                         (xqi = __xq_pop_head(vi->pending_reqs) != Noneidx)) {
514                 struct peer_req *ppr = (struct peer_req *) xqi;
515                 do_accepted_pr(peer, ppr);
516         }
517         return 0;
518 }
519
520 static int mapping_readwrite(struct peerd *peer, struct peer_req *pr)
521 {
522         struct vlmcd *vlmc = __get_vlmcd(peer);
523         struct vlmc_io *vio = __get_vlmcio(pr);
524         struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
525         uint64_t pos, datalen, offset;
526         uint32_t targetlen;
527         struct xseg_request *breq;
528         char *target;
529         int i,r;
530         xport p;
531         if (vio->mreq->state & XS_FAILED){
532                 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
533                                 (unsigned long)vio->mreq, vio->mreq->op);
534                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
535                 vio->mreq = NULL;
536                 vio->err = 1;
537                 conclude_pr(peer, pr);
538                 return 0;
539         }
540
541         if (!mreply || !mreply->cnt){
542                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
543                 vio->mreq = NULL;
544                 vio->err = 1;
545                 conclude_pr(peer, pr);
546                 return -1;
547         }
548
549         vio->breq_len = mreply->cnt;
550         vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
551         if (!vio->breqs) {
552                 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
553                 vio->mreq = NULL;
554                 vio->err = 1;
555                 conclude_pr(peer, pr);
556                 return -1;
557         }
558
559         pos = 0;
560         __set_vio_state(vio, SERVING);
561         for (i = 0; i < vio->breq_len; i++) {
562                 datalen = mreply->segs[i].size;
563                 offset = mreply->segs[i].offset;
564                 targetlen = mreply->segs[i].targetlen;
565                 breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
566                 if (!breq) {
567                         vio->err = 1;
568                         break;
569                 }
570                 r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
571                 if (r < 0) {
572                         vio->err = 1;
573                         xseg_put_request(peer->xseg, breq, pr->portno);
574                         break;
575                 }
576                 breq->offset = offset;
577                 breq->size = datalen;
578                 breq->op = pr->req->op;
579                 target = xseg_get_target(peer->xseg, breq);
580                 if (!target) {
581                         vio->err = 1;
582                         xseg_put_request(peer->xseg, breq, pr->portno);
583                         break;
584                 }
585                 strncpy(target, mreply->segs[i].target, targetlen);
586                 r = xseg_set_req_data(peer->xseg, breq, pr);
587                 if (r<0) {
588                         vio->err = 1;
589                         xseg_put_request(peer->xseg, breq, pr->portno);
590                         break;
591                 }
592
593                 // this should work, right ?
594                 breq->data = pr->req->data + pos;
595                 pos += datalen;
596                 p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
597                 if (p == NoPort){
598                         void *dummy;
599                         vio->err = 1;
600                         xseg_get_req_data(peer->xseg, breq, &dummy);
601                         xseg_put_request(peer->xseg, breq, pr->portno);
602                         break;
603                 }
604                 r = xseg_signal(peer->xseg, p);
605                 if (r < 0){
606                         XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
607                 }
608                 vio->breqs[i] = breq;
609         }
610         vio->breq_cnt = i;
611         xseg_put_request(peer->xseg, vio->mreq, pr->portno);
612         vio->mreq = NULL;
613         if (i == 0) {
614                 free(vio->breqs);
615                 vio->breqs = NULL;
616                 vio->err = 1;
617                 conclude_pr(peer, pr);
618                 return -1;
619         }
620         return 0;
621 }
622
623 static int handle_mapping(struct peerd *peer, struct peer_req *pr,
624                                 struct xseg_request *req)
625 {
626         struct vlmc_io *vio = __get_vlmcio(pr);
627
628         //assert vio>mreq == req
629         if (vio->mreq != req){
630                 XSEGLOG2(&lc, E ,"vio->mreq %lx, req: %lx state: %d breq[0]: %lx",
631                                 (unsigned long)vio->mreq, (unsigned long)req,
632                                 vio->state, (unsigned long)vio->breqs[0]);
633                 return -1;
634         }
635
636         switch (vio->mreq->op){
637                 case X_INFO:
638                         mapping_info(peer, pr);
639                         break;
640                 case X_SNAPSHOT:
641                         mapping_snapshot(peer, pr);
642                         break;
643                 case X_CLOSE:
644                         mapping_close(peer, pr);
645                         break;
646                 case X_OPEN:
647                         mapping_open(peer, pr);
648                         break;
649                 case X_MAPR:
650                 case X_MAPW:
651                         mapping_readwrite(peer, pr);
652                         break;
653                 default:
654                         XSEGLOG2(&lc, W, "Invalid mreq op");
655                         //vio->err = 1;
656                         //conclude_pr(peer, pr);
657                         break;
658         }
659
660         return 0;
661 }
662
663 static int handle_serving(struct peerd *peer, struct peer_req *pr, 
664                                 struct xseg_request *req)
665 {
666         struct vlmc_io *vio = __get_vlmcio(pr);
667         struct vlmcd *vlmc = __get_vlmcd(peer);
668         (void)vlmc;
669         struct xseg_request *breq = req;
670
671         if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
672                 XSEGLOG2(&lc, E, "req %lx (op: %d) failed at offset %llu\n",
673                                 (unsigned long)req, req->op,
674                                 (unsigned long long)req->offset);
675                 vio->err = 1;
676         } else {
677                 //assert breq->serviced == breq->size
678                 pr->req->serviced += breq->serviced;
679         }
680         xseg_put_request(peer->xseg, breq, pr->portno);
681
682         if (!--vio->breq_cnt){
683                 __set_vio_state(vio, CONCLUDED);
684                 free(vio->breqs);
685                 vio->breqs = NULL;
686                 vio->breq_len = 0;
687                 conclude_pr(peer, pr);
688         }
689         return 0;
690 }
691
692 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
693                 enum dispatch_reason reason)
694 {
695         struct vlmc_io *vio = __get_vlmcio(pr);
696         struct vlmcd *vlmc = __get_vlmcd(peer);
697         (void)vlmc;
698
699         if (reason == dispatch_accept)
700                 //assert (pr->req == req)
701                 __set_vio_state(vio, ACCEPTED);
702
703         enum io_state_enum state = __get_vio_state(vio);
704         switch (state) {
705                 case ACCEPTED:
706                         handle_accepted(peer, pr, req);
707                         break;
708                 case MAPPING:
709                         handle_mapping(peer, pr, req);
710                         break;
711                 case SERVING:
712                         handle_serving(peer, pr, req);
713                         break;
714                 case CONCLUDED:
715                         XSEGLOG2(&lc, W, "invalid state. dispatch called for CONCLUDED");
716                         break;
717                 default:
718                         XSEGLOG2(&lc, E, "wtf dude? invalid state");
719                         break;
720         }
721         return 0;
722 }
723
724
725 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
726 {
727         struct vlmc_io *vio;
728         struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
729         int i, j;
730
731         if (!vlmc) {
732                 XSEGLOG2(&lc, E, "Cannot alloc vlmc");
733                 return -1;
734         }
735         peer->priv = (void *) vlmc;
736
737         vlmc->volumes = xhash_new(3, STRING);
738         if (!vlmc->volumes){
739                 XSEGLOG2(&lc, E, "Cannot alloc vlmc");
740                 return -1;
741         }
742         vlmc->mportno = NoPort;
743         vlmc->bportno = NoPort;
744
745         BEGIN_READ_ARGS(argc, argv);
746         READ_ARG_ULONG("-mp", vlmc->mportno);
747         READ_ARG_ULONG("-bp", vlmc->bportno);
748         END_READ_ARGS();
749
750         if (vlmc->bportno == NoPort) {
751                 XSEGLOG2(&lc, E, "bportno must be provided");
752                 usage(argv[0]);
753                 return -1;
754         }
755         if (vlmc->mportno == NoPort) {
756                 XSEGLOG2(&lc, E, "mportno must be provided");
757                 usage(argv[0]);
758                 return -1;
759         }
760
761         for (i = 0; i < peer->nr_ops; i++) {
762                 vio = malloc(sizeof(struct vlmc_io));
763                 if (!vio) {
764                         break;
765                 }
766                 vio->mreq = NULL;
767                 vio->breqs = NULL;
768                 vio->breq_cnt = 0;
769                 vio->breq_len = 0;
770                 xlock_release(&vio->lock);
771                 peer->peer_reqs[i].priv = (void *) vio;
772         }
773         if (i < peer->nr_ops) {
774                 for (j = 0; j < i; j++) {
775                         free(peer->peer_reqs[i].priv);
776                 }
777                 return -1;
778         }
779
780
781         const struct sched_param param = { .sched_priority = 99 };
782         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
783
784         return 0;
785 }
786
787 void custom_peer_finalize(struct peerd *peer)
788 {
789         return;
790 }