2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
38 #include <xseg/xseg.h>
39 #include <xseg/protocol.h>
42 #include <sys/syscall.h>
51 #define VF_VOLUME_FREEZED (1 << 0)
54 char name[XSEG_MAX_TARGETLEN + 1];
57 struct xq *pending_reqs;
58 struct peer_req *pending_pr;
64 xhash_t *volumes; //hash [volumename] -> struct volume_info
70 volatile enum io_state_enum state;
71 struct xseg_request *mreq;
72 struct xseg_request **breqs;
73 unsigned long breq_len, breq_cnt;
76 void custom_peer_usage()
78 fprintf(stderr, "Custom peer options: \n"
80 "-bp : blocker port for blocks\n"
84 static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
89 static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
91 enum io_state_enum state;
96 static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
98 return (struct vlmc_io *) pr->priv;
101 static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
103 return (struct vlmcd *) peer->priv;
106 static struct xq * allocate_queue(xqindex nr)
108 struct xq *q = malloc(sizeof(struct xq));
111 if (!xq_alloc_empty(q, nr)){
118 static int doubleup_queue(struct volume_info *vi)
120 //assert vi->pending_reqs
121 XSEGLOG2(&lc, D, "Doubling up queue of volume %s", vi->name);
122 struct xq *newq = allocate_queue(vi->pending_reqs->size * 2);
124 XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Allocation error",
129 if (__xq_resize(vi->pending_reqs, newq) == Noneidx){
132 XSEGLOG2(&lc, E, "Doubling up queue of volume %s failed. Resize error",
136 xq_free(vi->pending_reqs);
137 free(vi->pending_reqs);
138 vi->pending_reqs = newq;
139 XSEGLOG2(&lc, D, "Doubling up queue of volume %s completed", vi->name);
143 static struct volume_info * find_volume(struct vlmcd *vlmc, char *volume)
145 struct volume_info *vi = NULL;
146 XSEGLOG2(&lc, D, "looking up volume %s", volume);
147 int r = xhash_lookup(vlmc->volumes, (xhashidx) volume,
150 XSEGLOG2(&lc, D, "looking up volume %s failed", volume);
153 XSEGLOG2(&lc, D, "looking up volume %s completed. VI: %lx",
154 volume, (unsigned long)vi);
158 static struct volume_info * find_volume_len(struct vlmcd *vlmc, char *target,
161 char buf[XSEG_MAX_TARGETLEN+1];
162 strncpy(buf, target, targetlen);
164 XSEGLOG2(&lc, D, "looking up volume %s, len %u",
166 return find_volume(vlmc, buf);
170 static int insert_volume(struct vlmcd *vlmc, struct volume_info *vi)
174 if (find_volume(vlmc, vi->name)){
175 XSEGLOG2(&lc, W, "Volume %s found in hash", vi->name);
179 XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx)",
180 vi->name, strlen(vi->name), (unsigned long) vi);
181 r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
182 while (r == -XHASH_ERESIZE) {
183 xhashidx shift = xhash_grow_size_shift(vlmc->volumes);
184 xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
186 XSEGLOG2(&lc, E, "Cannot grow vlmc->volumes to sizeshift %llu",
187 (unsigned long long) shift);
190 vlmc->volumes = new_hashmap;
191 r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
193 XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx) completed",
194 vi->name, strlen(vi->name), (unsigned long) vi);
200 static int remove_volume(struct vlmcd *vlmc, struct volume_info *vi)
204 XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx)",
205 vi->name, strlen(vi->name), (unsigned long) vi);
206 r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
207 while (r == -XHASH_ERESIZE) {
208 xhashidx shift = xhash_shrink_size_shift(vlmc->volumes);
209 xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
211 XSEGLOG2(&lc, E, "Cannot shrink vlmc->volumes to sizeshift %llu",
212 (unsigned long long) shift);
213 XSEGLOG2(&lc, E, "Removing volume %s, (volume_info: %lx) failed",
214 vi->name, (unsigned long) vi);
217 vlmc->volumes = new_hashmap;
218 r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
221 XSEGLOG2(&lc, W, "Removing volume %s, len: %d (volume_info: %lx) failed",
222 vi->name, strlen(vi->name), (unsigned long) vi);
224 XSEGLOG2(&lc, D, "Removing volume %s, len: %d (volume_info: %lx) completed",
225 vi->name, strlen(vi->name), (unsigned long) vi);
229 static int do_accepted_pr(struct peerd *peer, struct peer_req *pr);
231 static int conclude_pr(struct peerd *peer, struct peer_req *pr)
233 struct vlmcd *vlmc = __get_vlmcd(peer);
234 struct vlmc_io *vio = __get_vlmcio(pr);
235 char *target = xseg_get_target(peer->xseg, pr->req);
236 struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
238 XSEGLOG2(&lc, D, "Concluding pr %lx, req: %lx vi: %lx", pr, pr->req, vi);
240 __set_vio_state(vio, CONCLUDED);
247 //assert vi->active_reqs > 0
248 uint32_t ar = --vi->active_reqs;
249 XSEGLOG2(&lc, D, "vi: %lx, volume name: %s, active_reqs: %lu, pending_pr: %lx",
250 vi, vi->name, ar, vi->pending_pr);
251 if (!ar && vi->pending_pr)
252 do_accepted_pr(peer, vi->pending_pr);
254 XSEGLOG2(&lc, D, "Concluded pr %lx, vi: %lx", pr, vi);
258 static int do_accepted_pr(struct peerd *peer, struct peer_req *pr)
260 struct vlmcd *vlmc = __get_vlmcd(peer);
261 struct vlmc_io *vio = __get_vlmcio(pr);
264 char *target, *mtarget;
267 struct volume_info *vi;
269 XSEGLOG2(&lc, I, "Do accepted pr started for pr %lx", pr);
270 target = xseg_get_target(peer->xseg, pr->req);
273 conclude_pr(peer, pr);
277 vi = find_volume_len(vlmc, target, pr->req->targetlen);
279 XSEGLOG2(&lc, E, "Cannot find volume");
280 XSEGLOG2(&lc, E, "Pr %lx", pr);
282 conclude_pr(peer, pr);
286 if (pr->req->op == X_CLOSE || pr->req->op == X_SNAPSHOT){
287 XSEGLOG2(&lc, I, "Freezing volume %s", vi->name);
288 vi->flags |= VF_VOLUME_FREEZED;
289 if (vi->active_reqs){
290 //assert vi->pending_pr == NULL;
291 XSEGLOG2(&lc, I, "Active reqs of %s: %lu. Pending pr is set to %lx",
292 vi->name, vi->active_reqs, pr);
297 XSEGLOG2(&lc, I, "No active reqs of %s. Pending pr is set to NULL",
299 //assert vi->pending_pr == pr
300 vi->pending_pr = NULL;
306 vio->err = 0; //reset error state
308 if (pr->req->op == X_WRITE && !pr->req->size &&
309 (pr->req->flags & (XF_FLUSH|XF_FUA))){
310 //hanlde flush requests here, so we don't mess with mapper
311 //because of the -1 offset
312 XSEGLOG2(&lc, I, "Completing flush request");
313 pr->req->serviced = pr->req->size;
314 conclude_pr(peer, pr);
318 vio->mreq = xseg_get_request(peer->xseg, pr->portno,
319 vlmc->mportno, X_ALLOC);
323 /* use datalen 0. let mapper allocate buffer space as needed */
324 r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0);
326 XSEGLOG2(&lc, E, "Cannot prep request %lx, of pr %lx for volume %s",
327 vio->mreq, pr, vi->name);
330 mtarget = xseg_get_target(peer->xseg, vio->mreq);
334 strncpy(mtarget, target, pr->req->targetlen);
335 vio->mreq->size = pr->req->size;
336 vio->mreq->offset = pr->req->offset;
337 vio->mreq->flags = 0;
338 switch (pr->req->op) {
339 case X_READ: vio->mreq->op = X_MAPR; break;
340 case X_WRITE: vio->mreq->op = X_MAPW; break;
341 case X_INFO: vio->mreq->op = X_INFO; break;
342 case X_CLOSE: vio->mreq->op = X_CLOSE; break;
343 case X_OPEN: vio->mreq->op = X_OPEN; break;
344 case X_SNAPSHOT: vio->mreq->op = X_SNAPSHOT; break;
345 default: goto out_put;
347 xseg_set_req_data(peer->xseg, vio->mreq, pr);
348 __set_vio_state(vio, MAPPING);
349 p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
352 r = xseg_signal(peer->xseg, p);
354 /* since submission is successful, just print a warning message */
355 XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
358 XSEGLOG2(&lc, I, "Pr %lx of volume %s completed", pr, vi->name);
362 xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
364 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
367 XSEGLOG2(&lc, E, "Pr %lx of volume %s failed", pr, vi->name);
368 conclude_pr(peer, pr);
372 static int append_to_pending_reqs(struct volume_info *vi, struct peer_req *pr)
374 XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s",
376 if (!vi->pending_reqs){
377 //allocate 8 as default. FIXME make it relevant to nr_ops;
378 vi->pending_reqs = allocate_queue(8);
381 if (!vi->pending_reqs){
382 XSEGLOG2(&lc, E, "Cannot allocate pending reqs queue for volume %s",
384 XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
389 xqindex r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
391 if (doubleup_queue(vi) < 0){
392 XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
396 r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
400 XSEGLOG2(&lc, E, "Appending pr %lx to vi %lx, volume name %s failed",
405 XSEGLOG2(&lc, I, "Appending pr %lx to vi %lx, volume name %s completed",
410 static int handle_accepted(struct peerd *peer, struct peer_req *pr,
411 struct xseg_request *req)
413 struct vlmc_io *vio = __get_vlmcio(pr);
414 struct vlmcd *vlmc = __get_vlmcd(peer);
415 char *target = xseg_get_target(peer->xseg, req);
416 struct volume_info *vi = find_volume_len(vlmc, target, req->targetlen);
417 XSEGLOG2(&lc, I, "Handle accepted for pr %lx, req %lx started", pr, req);
419 vi = malloc(sizeof(struct volume_info));
422 conclude_pr(peer, pr);
425 strncpy(vi->name, target, req->targetlen);
426 vi->name[req->targetlen] = 0;
428 vi->pending_pr = NULL;
430 vi->pending_reqs = 0;
431 if (insert_volume(vlmc, vi) < 0){
433 conclude_pr(peer, pr);
439 if (vi->flags & VF_VOLUME_FREEZED){
440 XSEGLOG2(&lc, I, "Volume %s (vi %lx) frozen. Appending to pending_reqs",
442 if (append_to_pending_reqs(vi, pr) < 0){
444 conclude_pr(peer, pr);
450 return do_accepted_pr(peer, pr);
454 static int mapping_info(struct peerd *peer, struct peer_req *pr)
456 struct vlmc_io *vio = __get_vlmcio(pr);
457 if (vio->mreq->state & XS_FAILED){
458 XSEGLOG2(&lc, E, "Info req %lx failed",
459 (unsigned long)vio->mreq);
463 struct xseg_reply_info *xinfo = (struct xseg_reply_info *)xseg_get_data(peer->xseg, vio->mreq);
464 char *data = xseg_get_data(peer->xseg, pr->req);
465 struct xseg_reply_info *xreply = (struct xseg_reply_info *)data;
466 xreply->size = xinfo->size;
468 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
470 conclude_pr(peer, pr);
474 static int mapping_open(struct peerd *peer, struct peer_req *pr)
476 struct vlmc_io *vio = __get_vlmcio(pr);
477 if (vio->mreq->state & XS_FAILED){
478 XSEGLOG2(&lc, E, "Open req %lx failed",
479 (unsigned long)vio->mreq);
482 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
484 conclude_pr(peer, pr);
488 static int mapping_close(struct peerd *peer, struct peer_req *pr)
490 struct vlmcd *vlmc = __get_vlmcd(peer);
491 struct vlmc_io *vio = __get_vlmcio(pr);
492 if (vio->mreq->state & XS_FAILED){
493 XSEGLOG2(&lc, E, "Close req %lx failed",
494 (unsigned long)vio->mreq);
497 char *target = xseg_get_target(peer->xseg, pr->req);
498 struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
500 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
502 conclude_pr(peer, pr);
504 //assert active_reqs == 1
505 //assert volume freezed
508 XSEGLOG2(&lc, E, "Volume has not volume info");
511 vi->flags &= ~ VF_VOLUME_FREEZED;
512 if (!vi->pending_reqs || !xq_count(vi->pending_reqs)){
513 XSEGLOG2(&lc, I, "Volume %s (vi %lx) had no pending reqs. Removing",
515 if (vi->pending_reqs)
516 xq_free(vi->pending_reqs);
517 remove_volume(vlmc, vi);
522 XSEGLOG2(&lc, I, "Volume %s (vi %lx) had pending reqs. Handling",
524 while (!(vi->flags & VF_VOLUME_FREEZED) &&
525 (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
526 struct peer_req *ppr = (struct peer_req *) xqi;
527 do_accepted_pr(peer, ppr);
529 XSEGLOG2(&lc, I, "Volume %s (vi %lx) handling pending reqs completed",
535 static int mapping_snapshot(struct peerd *peer, struct peer_req *pr)
537 struct vlmcd *vlmc = __get_vlmcd(peer);
538 struct vlmc_io *vio = __get_vlmcio(pr);
539 char *target = xseg_get_target(peer->xseg, pr->req);
540 struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
541 if (vio->mreq->state & XS_FAILED){
542 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
543 (unsigned long)vio->mreq, vio->mreq->op);
547 struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, vio->mreq);
548 char buf[XSEG_MAX_TARGETLEN];
549 strncpy(buf, target, pr->req->targetlen);
550 int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, sizeof(struct xseg_reply_snapshot));
552 XSEGLOG2(&lc, E, "Cannot resize request");
556 target = xseg_get_target(peer->xseg, pr->req);
557 strncpy(target, buf, pr->req->targetlen);
558 char *data = xseg_get_data(peer->xseg, pr->req);
559 struct xseg_reply_snapshot *xsnapshot = (struct xseg_reply_snapshot *) data;
560 *xsnapshot = *xreply;
564 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
566 conclude_pr(peer, pr);
568 //assert volume freezed
571 XSEGLOG2(&lc, E, "Volume has no volume info");
574 XSEGLOG2(&lc, D, "Unfreezing volume %s", vi->name);
575 vi->flags &= ~ VF_VOLUME_FREEZED;
578 while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
579 (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
580 struct peer_req *ppr = (struct peer_req *) xqi;
581 do_accepted_pr(peer, ppr);
586 static int mapping_readwrite(struct peerd *peer, struct peer_req *pr)
588 struct vlmcd *vlmc = __get_vlmcd(peer);
589 struct vlmc_io *vio = __get_vlmcio(pr);
590 struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
591 uint64_t pos, datalen, offset;
593 struct xseg_request *breq;
597 if (vio->mreq->state & XS_FAILED){
598 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
599 (unsigned long)vio->mreq, vio->mreq->op);
600 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
603 conclude_pr(peer, pr);
607 if (!mreply || !mreply->cnt){
608 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
611 conclude_pr(peer, pr);
615 vio->breq_len = mreply->cnt;
616 vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
618 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
621 conclude_pr(peer, pr);
626 __set_vio_state(vio, SERVING);
627 for (i = 0; i < vio->breq_len; i++) {
628 datalen = mreply->segs[i].size;
629 offset = mreply->segs[i].offset;
630 targetlen = mreply->segs[i].targetlen;
631 breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
636 r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
639 xseg_put_request(peer->xseg, breq, pr->portno);
642 breq->offset = offset;
643 breq->size = datalen;
644 breq->op = pr->req->op;
645 target = xseg_get_target(peer->xseg, breq);
648 xseg_put_request(peer->xseg, breq, pr->portno);
651 strncpy(target, mreply->segs[i].target, targetlen);
652 r = xseg_set_req_data(peer->xseg, breq, pr);
655 xseg_put_request(peer->xseg, breq, pr->portno);
659 // this should work, right ?
660 breq->data = pr->req->data + pos;
662 p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
666 xseg_get_req_data(peer->xseg, breq, &dummy);
667 xseg_put_request(peer->xseg, breq, pr->portno);
670 r = xseg_signal(peer->xseg, p);
672 XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
674 vio->breqs[i] = breq;
677 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
683 conclude_pr(peer, pr);
689 static int handle_mapping(struct peerd *peer, struct peer_req *pr,
690 struct xseg_request *req)
692 struct vlmc_io *vio = __get_vlmcio(pr);
694 //assert vio>mreq == req
695 if (vio->mreq != req){
696 XSEGLOG2(&lc, E ,"vio->mreq %lx, req: %lx state: %d breq[0]: %lx",
697 (unsigned long)vio->mreq, (unsigned long)req,
698 vio->state, (unsigned long)vio->breqs[0]);
702 switch (vio->mreq->op){
704 mapping_info(peer, pr);
707 mapping_snapshot(peer, pr);
710 mapping_close(peer, pr);
713 mapping_open(peer, pr);
717 mapping_readwrite(peer, pr);
720 XSEGLOG2(&lc, W, "Invalid mreq op");
722 //conclude_pr(peer, pr);
729 static int handle_serving(struct peerd *peer, struct peer_req *pr,
730 struct xseg_request *req)
732 struct vlmc_io *vio = __get_vlmcio(pr);
733 struct vlmcd *vlmc = __get_vlmcd(peer);
735 struct xseg_request *breq = req;
737 if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
738 XSEGLOG2(&lc, E, "req %lx (op: %d) failed at offset %llu\n",
739 (unsigned long)req, req->op,
740 (unsigned long long)req->offset);
743 //assert breq->serviced == breq->size
744 pr->req->serviced += breq->serviced;
746 xseg_put_request(peer->xseg, breq, pr->portno);
748 if (!--vio->breq_cnt){
749 __set_vio_state(vio, CONCLUDED);
753 conclude_pr(peer, pr);
758 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
759 enum dispatch_reason reason)
761 struct vlmc_io *vio = __get_vlmcio(pr);
762 struct vlmcd *vlmc = __get_vlmcd(peer);
765 if (reason == dispatch_accept)
766 //assert (pr->req == req)
767 __set_vio_state(vio, ACCEPTED);
769 enum io_state_enum state = __get_vio_state(vio);
772 handle_accepted(peer, pr, req);
775 handle_mapping(peer, pr, req);
778 handle_serving(peer, pr, req);
781 XSEGLOG2(&lc, W, "invalid state. dispatch called for CONCLUDED");
784 XSEGLOG2(&lc, E, "wtf dude? invalid state");
791 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
794 struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
798 XSEGLOG2(&lc, E, "Cannot alloc vlmc");
801 peer->priv = (void *) vlmc;
803 vlmc->volumes = xhash_new(3, STRING);
805 XSEGLOG2(&lc, E, "Cannot alloc vlmc");
808 vlmc->mportno = NoPort;
809 vlmc->bportno = NoPort;
811 BEGIN_READ_ARGS(argc, argv);
812 READ_ARG_ULONG("-mp", vlmc->mportno);
813 READ_ARG_ULONG("-bp", vlmc->bportno);
816 if (vlmc->bportno == NoPort) {
817 XSEGLOG2(&lc, E, "bportno must be provided");
821 if (vlmc->mportno == NoPort) {
822 XSEGLOG2(&lc, E, "mportno must be provided");
827 for (i = 0; i < peer->nr_ops; i++) {
828 vio = malloc(sizeof(struct vlmc_io));
836 xlock_release(&vio->lock);
837 peer->peer_reqs[i].priv = (void *) vio;
839 if (i < peer->nr_ops) {
840 for (j = 0; j < i; j++) {
841 free(peer->peer_reqs[i].priv);
847 const struct sched_param param = { .sched_priority = 99 };
848 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
853 void custom_peer_finalize(struct peerd *peer)