2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
38 #include <xseg/xseg.h>
39 #include <xseg/protocol.h>
42 #include <sys/syscall.h>
51 #define VF_VOLUME_FREEZED (1 << 0)
54 char name[XSEG_MAX_TARGETLEN + 1];
57 struct xq *pending_reqs;
58 struct peer_req *pending_pr;
64 xhash_t *volumes; //hash [volumename] -> struct volume_info
70 volatile enum io_state_enum state;
71 struct xseg_request *mreq;
72 struct xseg_request **breqs;
73 unsigned long breq_len, breq_cnt;
76 void custom_peer_usage()
78 fprintf(stderr, "Custom peer options: \n"
80 "-bp : blocker port for blocks\n"
84 static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
89 static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
91 enum io_state_enum state;
96 static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
98 return (struct vlmc_io *) pr->priv;
101 static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
103 return (struct vlmcd *) peer->priv;
106 static struct xq * allocate_queue(xqindex nr)
108 struct xq *q = malloc(sizeof(struct xq));
111 if (!xq_alloc_empty(q, nr)){
118 static int doubleup_queue(struct volume_info *vi)
120 //assert vi->pending_reqs
121 XSEGLOG2(&lc, D, "Doubling up queue of volume %s", vi->name);
122 struct xq *newq = allocate_queue(vi->pending_reqs->size * 2);
124 XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Allocation error",
129 if (__xq_resize(vi->pending_reqs, newq) == Noneidx){
132 XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Resize error",
136 xq_free(vi->pending_reqs);
137 free(vi->pending_reqs);
138 vi->pending_reqs = newq;
139 XSEGLOG2(&lc, D, "Doubling up queue of volume %s completed", vi->name);
143 static struct volume_info * find_volume(struct vlmcd *vlmc, char *volume)
145 struct volume_info *vi = NULL;
146 XSEGLOG2(&lc, D, "looking up volume %s", volume);
147 int r = xhash_lookup(vlmc->volumes, (xhashidx) volume,
150 XSEGLOG2(&lc, D, "looking up volume %s failed", volume);
153 XSEGLOG2(&lc, D, "looking up volume %s completed. VI: %lx",
154 volume, (unsigned long)vi);
158 static struct volume_info * find_volume_len(struct vlmcd *vlmc, char *target,
161 char buf[XSEG_MAX_TARGETLEN+1];
162 strncpy(buf, target, targetlen);
164 XSEGLOG2(&lc, D, "looking up volume %s, len %u",
166 return find_volume(vlmc, buf);
170 static int insert_volume(struct vlmcd *vlmc, struct volume_info *vi)
174 if (find_volume(vlmc, vi->name)){
175 XSEGLOG2(&lc, W, "Volume %s found in hash", vi->name);
179 XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx)",
180 vi->name, strlen(vi->name), (unsigned long) vi);
181 r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
182 while (r == -XHASH_ERESIZE) {
183 xhashidx shift = xhash_grow_size_shift(vlmc->volumes);
184 xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
186 XSEGLOG2(&lc, E, "Cannot grow vlmc->volumes to sizeshift %llu",
187 (unsigned long long) shift);
190 vlmc->volumes = new_hashmap;
191 r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
193 XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx) completed",
194 vi->name, strlen(vi->name), (unsigned long) vi);
200 static int remove_volume(struct vlmcd *vlmc, struct volume_info *vi)
204 XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx)",
205 vi->name, strlen(vi->name), (unsigned long) vi);
206 r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
207 while (r == -XHASH_ERESIZE) {
208 xhashidx shift = xhash_shrink_size_shift(vlmc->volumes);
209 xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
211 XSEGLOG2(&lc, E, "Cannot shrink vlmc->volumes to sizeshift %llu",
212 (unsigned long long) shift);
213 XSEGLOG2(&lc, E, "Removing volume %s, (volume_info: %lx) failed",
214 vi->name, (unsigned long) vi);
217 vlmc->volumes = new_hashmap;
218 r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
221 XSEGLOG2(&lc, W, "Removing volume %s, len: %d (volume_info: %lx) failed",
222 vi->name, strlen(vi->name), (unsigned long) vi);
224 XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx) completed",
225 vi->name, strlen(vi->name), (unsigned long) vi);
229 static int do_accepted_pr(struct peerd *peer, struct peer_req *pr);
231 static int conclude_pr(struct peerd *peer, struct peer_req *pr)
233 struct vlmcd *vlmc = __get_vlmcd(peer);
234 struct vlmc_io *vio = __get_vlmcio(pr);
235 char *target = xseg_get_target(peer->xseg, pr->req);
236 struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
238 XSEGLOG2(&lc, D, "Concluding pr %lx, req: %lx vi: %lx", pr, pr->req, vi);
240 __set_vio_state(vio, CONCLUDED);
247 //assert vi->active_reqs > 0
248 uint32_t ar = --vi->active_reqs;
249 XSEGLOG2(&lc, D, "vi: %lx, volume name: %s, active_reqs: %lu, pending_pr: %lx",
250 vi, vi->name, ar, vi->pending_pr);
251 if (!ar && vi->pending_pr)
252 do_accepted_pr(peer, vi->pending_pr);
254 XSEGLOG2(&lc, D, "Concluded pr %lx, vi: %lx", pr, vi);
258 static int should_freeze_volume(struct xseg_request *req)
260 if (req->op == X_CLOSE || req->op == X_SNAPSHOT ||
261 (req->op == X_WRITE && !req->size && (req->flags & XF_FLUSH)))
266 static int do_accepted_pr(struct peerd *peer, struct peer_req *pr)
268 struct vlmcd *vlmc = __get_vlmcd(peer);
269 struct vlmc_io *vio = __get_vlmcio(pr);
272 char *target, *mtarget;
275 struct volume_info *vi;
277 XSEGLOG2(&lc, I, "Do accepted pr started for pr %lx", pr);
278 target = xseg_get_target(peer->xseg, pr->req);
281 conclude_pr(peer, pr);
285 vi = find_volume_len(vlmc, target, pr->req->targetlen);
287 XSEGLOG2(&lc, E, "Cannot find volume");
288 XSEGLOG2(&lc, E, "Pr %lx", pr);
290 conclude_pr(peer, pr);
294 if (should_freeze_volume(pr->req)){
295 XSEGLOG2(&lc, I, "Freezing volume %s", vi->name);
296 vi->flags |= VF_VOLUME_FREEZED;
297 if (vi->active_reqs){
298 //assert vi->pending_pr == NULL;
299 XSEGLOG2(&lc, I, "Active reqs of %s: %lu. Pending pr is set to %lx",
300 vi->name, vi->active_reqs, pr);
305 XSEGLOG2(&lc, I, "No active reqs of %s. Pending pr is set to NULL",
307 //assert vi->pending_pr == pr
308 vi->pending_pr = NULL;
315 vio->err = 0; //reset error state
317 if (pr->req->op == X_WRITE && !pr->req->size &&
318 (pr->req->flags & (XF_FLUSH|XF_FUA))){
319 //hanlde flush requests here, so we don't mess with mapper
320 //because of the -1 offset
321 vi->flags &= ~VF_VOLUME_FREEZED;
322 XSEGLOG2(&lc, I, "Completing flush request");
323 pr->req->serviced = pr->req->size;
324 conclude_pr(peer, pr);
326 while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
327 (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
328 struct peer_req *ppr = (struct peer_req *) xqi;
329 do_accepted_pr(peer, ppr);
334 vio->mreq = xseg_get_request(peer->xseg, pr->portno,
335 vlmc->mportno, X_ALLOC);
339 /* use datalen 0. let mapper allocate buffer space as needed */
340 r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0);
342 XSEGLOG2(&lc, E, "Cannot prep request %lx, of pr %lx for volume %s",
343 vio->mreq, pr, vi->name);
346 mtarget = xseg_get_target(peer->xseg, vio->mreq);
350 strncpy(mtarget, target, pr->req->targetlen);
351 vio->mreq->size = pr->req->size;
352 vio->mreq->offset = pr->req->offset;
353 vio->mreq->flags = 0;
354 switch (pr->req->op) {
355 case X_READ: vio->mreq->op = X_MAPR; break;
356 case X_WRITE: vio->mreq->op = X_MAPW; break;
357 case X_INFO: vio->mreq->op = X_INFO; break;
358 case X_CLOSE: vio->mreq->op = X_CLOSE; break;
359 case X_OPEN: vio->mreq->op = X_OPEN; break;
360 case X_SNAPSHOT: vio->mreq->op = X_SNAPSHOT; break;
361 default: goto out_put;
363 xseg_set_req_data(peer->xseg, vio->mreq, pr);
364 __set_vio_state(vio, MAPPING);
365 p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
368 r = xseg_signal(peer->xseg, p);
370 /* since submission is successful, just print a warning message */
371 XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
374 XSEGLOG2(&lc, I, "Pr %lx of volume %s completed", pr, vi->name);
378 xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
380 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
383 XSEGLOG2(&lc, E, "Pr %lx of volume %s failed", pr, vi->name);
384 conclude_pr(peer, pr);
388 static int append_to_pending_reqs(struct volume_info *vi, struct peer_req *pr)
390 XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s",
392 if (!vi->pending_reqs){
393 //allocate 8 as default. FIXME make it relevant to nr_ops;
394 vi->pending_reqs = allocate_queue(8);
397 if (!vi->pending_reqs){
398 XSEGLOG2(&lc, E, "Cannot allocate pending reqs queue for volume %s",
400 XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
405 xqindex r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
407 if (doubleup_queue(vi) < 0){
408 XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
412 r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
416 XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
421 XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s completed",
426 static int handle_accepted(struct peerd *peer, struct peer_req *pr,
427 struct xseg_request *req)
429 struct vlmc_io *vio = __get_vlmcio(pr);
430 struct vlmcd *vlmc = __get_vlmcd(peer);
431 char *target = xseg_get_target(peer->xseg, req);
432 struct volume_info *vi = find_volume_len(vlmc, target, req->targetlen);
433 XSEGLOG2(&lc, I, "Handle accepted for pr %lx, req %lx started", pr, req);
435 vi = malloc(sizeof(struct volume_info));
438 conclude_pr(peer, pr);
441 strncpy(vi->name, target, req->targetlen);
442 vi->name[req->targetlen] = 0;
444 vi->pending_pr = NULL;
446 vi->pending_reqs = 0;
447 if (insert_volume(vlmc, vi) < 0){
449 conclude_pr(peer, pr);
455 if (vi->flags & VF_VOLUME_FREEZED){
456 XSEGLOG2(&lc, I, "Volume %s (vi %lx) frozen. Appending to pending_reqs",
458 if (append_to_pending_reqs(vi, pr) < 0){
460 conclude_pr(peer, pr);
466 return do_accepted_pr(peer, pr);
470 static int mapping_info(struct peerd *peer, struct peer_req *pr)
472 struct vlmc_io *vio = __get_vlmcio(pr);
473 if (vio->mreq->state & XS_FAILED){
474 XSEGLOG2(&lc, E, "Info req %lx failed",
475 (unsigned long)vio->mreq);
479 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)xseg_get_data(peer->xseg, vio->mreq);
480 char *data = xseg_get_data(peer->xseg, pr->req);
481 struct xseg_reply_info *xreply = (struct xseg_reply_info *)data;
482 xreply->size = xinfo->size;
484 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
486 conclude_pr(peer, pr);
490 static int mapping_open(struct peerd *peer, struct peer_req *pr)
492 struct vlmc_io *vio = __get_vlmcio(pr);
493 if (vio->mreq->state & XS_FAILED){
494 XSEGLOG2(&lc, E, "Open req %lx failed",
495 (unsigned long)vio->mreq);
498 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
500 conclude_pr(peer, pr);
504 static int mapping_close(struct peerd *peer, struct peer_req *pr)
506 struct vlmcd *vlmc = __get_vlmcd(peer);
507 struct vlmc_io *vio = __get_vlmcio(pr);
508 if (vio->mreq->state & XS_FAILED){
509 XSEGLOG2(&lc, E, "Close req %lx failed",
510 (unsigned long)vio->mreq);
513 char *target = xseg_get_target(peer->xseg, pr->req);
514 struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
516 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
518 conclude_pr(peer, pr);
520 //assert active_reqs == 1
521 //assert volume freezed
524 XSEGLOG2(&lc, E, "Volume has not volume info");
527 vi->flags &= ~ VF_VOLUME_FREEZED;
528 if (!vi->pending_reqs || !xq_count(vi->pending_reqs)){
529 XSEGLOG2(&lc, I, "Volume %s (vi %lx) had no pending reqs. Removing",
531 if (vi->pending_reqs)
532 xq_free(vi->pending_reqs);
533 remove_volume(vlmc, vi);
538 XSEGLOG2(&lc, I, "Volume %s (vi %lx) had pending reqs. Handling",
540 while (!(vi->flags & VF_VOLUME_FREEZED) &&
541 (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
542 struct peer_req *ppr = (struct peer_req *) xqi;
543 do_accepted_pr(peer, ppr);
545 XSEGLOG2(&lc, I, "Volume %s (vi %lx) handling pending reqs completed",
551 static int mapping_snapshot(struct peerd *peer, struct peer_req *pr)
553 struct vlmcd *vlmc = __get_vlmcd(peer);
554 struct vlmc_io *vio = __get_vlmcio(pr);
555 char *target = xseg_get_target(peer->xseg, pr->req);
556 struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
557 if (vio->mreq->state & XS_FAILED){
558 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
559 (unsigned long)vio->mreq, vio->mreq->op);
563 struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, vio->mreq);
564 char buf[XSEG_MAX_TARGETLEN];
565 strncpy(buf, target, pr->req->targetlen);
566 int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, sizeof(struct xseg_reply_snapshot));
568 XSEGLOG2(&lc, E, "Cannot resize request");
572 target = xseg_get_target(peer->xseg, pr->req);
573 strncpy(target, buf, pr->req->targetlen);
574 char *data = xseg_get_data(peer->xseg, pr->req);
575 struct xseg_reply_snapshot *xsnapshot = (struct xseg_reply_snapshot *) data;
576 *xsnapshot = *xreply;
580 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
582 conclude_pr(peer, pr);
584 //assert volume freezed
587 XSEGLOG2(&lc, E, "Volume has no volume info");
590 XSEGLOG2(&lc, D, "Unfreezing volume %s", vi->name);
591 vi->flags &= ~ VF_VOLUME_FREEZED;
594 while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
595 (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
596 struct peer_req *ppr = (struct peer_req *) xqi;
597 do_accepted_pr(peer, ppr);
602 static int mapping_readwrite(struct peerd *peer, struct peer_req *pr)
604 struct vlmcd *vlmc = __get_vlmcd(peer);
605 struct vlmc_io *vio = __get_vlmcio(pr);
606 struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
607 uint64_t pos, datalen, offset;
609 struct xseg_request *breq;
613 if (vio->mreq->state & XS_FAILED){
614 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
615 (unsigned long)vio->mreq, vio->mreq->op);
616 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
619 conclude_pr(peer, pr);
623 if (!mreply || !mreply->cnt){
624 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
627 conclude_pr(peer, pr);
631 vio->breq_len = mreply->cnt;
632 vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
634 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
637 conclude_pr(peer, pr);
642 __set_vio_state(vio, SERVING);
643 for (i = 0; i < vio->breq_len; i++) {
644 datalen = mreply->segs[i].size;
645 offset = mreply->segs[i].offset;
646 targetlen = mreply->segs[i].targetlen;
647 breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
652 r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
655 xseg_put_request(peer->xseg, breq, pr->portno);
658 breq->offset = offset;
659 breq->size = datalen;
660 breq->op = pr->req->op;
661 target = xseg_get_target(peer->xseg, breq);
664 xseg_put_request(peer->xseg, breq, pr->portno);
667 strncpy(target, mreply->segs[i].target, targetlen);
668 r = xseg_set_req_data(peer->xseg, breq, pr);
671 xseg_put_request(peer->xseg, breq, pr->portno);
675 // this should work, right ?
676 breq->data = pr->req->data + pos;
678 p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
682 xseg_get_req_data(peer->xseg, breq, &dummy);
683 xseg_put_request(peer->xseg, breq, pr->portno);
686 r = xseg_signal(peer->xseg, p);
688 XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
690 vio->breqs[i] = breq;
693 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
699 conclude_pr(peer, pr);
705 static int handle_mapping(struct peerd *peer, struct peer_req *pr,
706 struct xseg_request *req)
708 struct vlmc_io *vio = __get_vlmcio(pr);
710 //assert vio>mreq == req
711 if (vio->mreq != req){
712 XSEGLOG2(&lc, E ,"vio->mreq %lx, req: %lx state: %d breq[0]: %lx",
713 (unsigned long)vio->mreq, (unsigned long)req,
714 vio->state, (unsigned long)vio->breqs[0]);
718 switch (vio->mreq->op){
720 mapping_info(peer, pr);
723 mapping_snapshot(peer, pr);
726 mapping_close(peer, pr);
729 mapping_open(peer, pr);
733 mapping_readwrite(peer, pr);
736 XSEGLOG2(&lc, W, "Invalid mreq op");
738 //conclude_pr(peer, pr);
745 static int handle_serving(struct peerd *peer, struct peer_req *pr,
746 struct xseg_request *req)
748 struct vlmc_io *vio = __get_vlmcio(pr);
749 struct vlmcd *vlmc = __get_vlmcd(peer);
751 struct xseg_request *breq = req;
753 if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
754 XSEGLOG2(&lc, E, "req %lx (op: %d) failed at offset %llu\n",
755 (unsigned long)req, req->op,
756 (unsigned long long)req->offset);
759 //assert breq->serviced == breq->size
760 pr->req->serviced += breq->serviced;
762 xseg_put_request(peer->xseg, breq, pr->portno);
764 if (!--vio->breq_cnt){
765 __set_vio_state(vio, CONCLUDED);
769 conclude_pr(peer, pr);
774 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
775 enum dispatch_reason reason)
777 struct vlmc_io *vio = __get_vlmcio(pr);
778 struct vlmcd *vlmc = __get_vlmcd(peer);
781 if (reason == dispatch_accept)
782 //assert (pr->req == req)
783 __set_vio_state(vio, ACCEPTED);
785 enum io_state_enum state = __get_vio_state(vio);
788 handle_accepted(peer, pr, req);
791 handle_mapping(peer, pr, req);
794 handle_serving(peer, pr, req);
797 XSEGLOG2(&lc, W, "invalid state. dispatch called for CONCLUDED");
800 XSEGLOG2(&lc, E, "wtf dude? invalid state");
807 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
810 struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
814 XSEGLOG2(&lc, E, "Cannot alloc vlmc");
817 peer->priv = (void *) vlmc;
819 vlmc->volumes = xhash_new(3, STRING);
821 XSEGLOG2(&lc, E, "Cannot alloc vlmc");
824 vlmc->mportno = NoPort;
825 vlmc->bportno = NoPort;
827 BEGIN_READ_ARGS(argc, argv);
828 READ_ARG_ULONG("-mp", vlmc->mportno);
829 READ_ARG_ULONG("-bp", vlmc->bportno);
832 if (vlmc->bportno == NoPort) {
833 XSEGLOG2(&lc, E, "bportno must be provided");
837 if (vlmc->mportno == NoPort) {
838 XSEGLOG2(&lc, E, "mportno must be provided");
843 for (i = 0; i < peer->nr_ops; i++) {
844 vio = malloc(sizeof(struct vlmc_io));
852 xlock_release(&vio->lock);
853 peer->peer_reqs[i].priv = (void *) vio;
855 if (i < peer->nr_ops) {
856 for (j = 0; j < i; j++) {
857 free(peer->peer_reqs[i].priv);
863 const struct sched_param param = { .sched_priority = 99 };
864 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
869 void custom_peer_finalize(struct peerd *peer)