2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
38 #include <xseg/xseg.h>
39 #include <xseg/protocol.h>
42 #include <sys/syscall.h>
51 #define VF_VOLUME_FREEZED (1 << 0)
54 char name[XSEG_MAX_TARGETLEN + 1];
57 struct xq *pending_reqs;
58 struct peer_req *pending_pr;
64 xhash_t *volumes; //hash [volumename] -> struct volume_info
70 volatile enum io_state_enum state;
71 struct xseg_request *mreq;
72 struct xseg_request **breqs;
73 unsigned long breq_len, breq_cnt;
76 void custom_peer_usage()
78 fprintf(stderr, "Custom peer options: \n"
80 "-bp : blocker port for blocks\n"
84 static inline void __set_vio_state(struct vlmc_io *vio, enum io_state_enum state)
89 static inline enum io_state_enum __get_vio_state(struct vlmc_io *vio)
91 enum io_state_enum state;
96 static inline struct vlmc_io * __get_vlmcio(struct peer_req *pr)
98 return (struct vlmc_io *) pr->priv;
101 static inline struct vlmcd * __get_vlmcd(struct peerd *peer)
103 return (struct vlmcd *) peer->priv;
106 static struct xq * allocate_queue(xqindex nr)
108 struct xq *q = malloc(sizeof(struct xq));
111 if (!xq_alloc_empty(q, nr)){
118 static int doubleup_queue(struct volume_info *vi)
120 //assert vi->pending_reqs
121 struct xq *newq = allocate_queue(vi->pending_reqs->size * 2);
125 if (__xq_resize(vi->pending_reqs, newq) == Noneidx){
130 xq_free(vi->pending_reqs);
131 free(vi->pending_reqs);
132 vi->pending_reqs = newq;
136 static struct volume_info * find_volume(struct vlmcd *vlmc, char *volume)
138 struct volume_info *vi = NULL;
139 int r = xhash_lookup(vlmc->volumes, (xhashidx) volume,
146 static struct volume_info * find_volume_len(struct vlmcd *vlmc, char *target,
149 char buf[XSEG_MAX_TARGETLEN+1];
150 strncpy(buf, target, targetlen);
152 XSEGLOG2(&lc, D, "looking up volume %s, len %u",
154 return find_volume(vlmc, buf);
158 static int insert_volume(struct vlmcd *vlmc, struct volume_info *vi)
162 if (find_volume(vlmc, vi->name)){
163 XSEGLOG2(&lc, W, "Volume %s found in hash", vi->name);
167 XSEGLOG2(&lc, D, "Inserting volume %s, len: %d (volume_info: %lx)",
168 vi->name, strlen(vi->name), (unsigned long) vi);
169 r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
170 while (r == -XHASH_ERESIZE) {
171 xhashidx shift = xhash_grow_size_shift(vlmc->volumes);
172 xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
174 XSEGLOG2(&lc, E, "Cannot grow vlmc->volumes to sizeshift %llu",
175 (unsigned long long) shift);
178 vlmc->volumes = new_hashmap;
179 r = xhash_insert(vlmc->volumes, (xhashidx) vi->name, (xhashidx) vi);
186 static int remove_volume(struct vlmcd *vlmc, struct volume_info *vi)
190 r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
191 while (r == -XHASH_ERESIZE) {
192 xhashidx shift = xhash_shrink_size_shift(vlmc->volumes);
193 xhash_t *new_hashmap = xhash_resize(vlmc->volumes, shift, NULL);
195 XSEGLOG2(&lc, E, "Cannot shrink vlmc->volumes to sizeshift %llu",
196 (unsigned long long) shift);
199 vlmc->volumes = new_hashmap;
200 r = xhash_delete(vlmc->volumes, (xhashidx) vi->name);
206 static int do_accepted_pr(struct peerd *peer, struct peer_req *pr);
208 static int conclude_pr(struct peerd *peer, struct peer_req *pr)
210 struct vlmcd *vlmc = __get_vlmcd(peer);
211 struct vlmc_io *vio = __get_vlmcio(pr);
212 char *target = xseg_get_target(peer->xseg, pr->req);
213 struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
215 __set_vio_state(vio, CONCLUDED);
222 //assert vi->active_reqs > 0
223 uint32_t ar = --vi->active_reqs;
224 if (!ar && vi->pending_pr)
225 do_accepted_pr(peer, vi->pending_pr);
230 static int do_accepted_pr(struct peerd *peer, struct peer_req *pr)
232 struct vlmcd *vlmc = __get_vlmcd(peer);
233 struct vlmc_io *vio = __get_vlmcio(pr);
236 char *target, *mtarget;
239 struct volume_info *vi;
241 target = xseg_get_target(peer->xseg, pr->req);
245 vi = find_volume_len(vlmc, target, pr->req->targetlen);
247 XSEGLOG2(&lc, E, "Cannot find volume");
251 if (pr->req->op == X_CLOSE || pr->req->op == X_SNAPSHOT){
252 vi->flags |= VF_VOLUME_FREEZED;
253 if (vi->active_reqs){
254 //assert vi->pending_pr == NULL;
259 //assert vi->pending_pr == pr
260 vi->pending_pr = NULL;
266 vio->err = 0; //reset error state
268 if (pr->req->op == X_WRITE && !pr->req->size &&
269 (pr->req->flags & (XF_FLUSH|XF_FUA))){
270 //hanlde flush requests here, so we don't mess with mapper
271 //because of the -1 offset
272 XSEGLOG2(&lc, I, "Completing flush request");
273 pr->req->serviced = pr->req->size;
274 conclude_pr(peer, pr);
278 vio->mreq = xseg_get_request(peer->xseg, pr->portno,
279 vlmc->mportno, X_ALLOC);
283 /* use datalen 0. let mapper allocate buffer space as needed */
284 r = xseg_prep_request(peer->xseg, vio->mreq, pr->req->targetlen, 0);
288 mtarget = xseg_get_target(peer->xseg, vio->mreq);
292 strncpy(mtarget, target, pr->req->targetlen);
293 vio->mreq->size = pr->req->size;
294 vio->mreq->offset = pr->req->offset;
295 vio->mreq->flags = 0;
296 switch (pr->req->op) {
297 case X_READ: vio->mreq->op = X_MAPR; break;
298 case X_WRITE: vio->mreq->op = X_MAPW; break;
299 case X_INFO: vio->mreq->op = X_INFO; break;
300 case X_CLOSE: vio->mreq->op = X_CLOSE; break;
301 case X_OPEN: vio->mreq->op = X_OPEN; break;
302 case X_SNAPSHOT: vio->mreq->op = X_SNAPSHOT; break;
303 default: goto out_put;
305 xseg_set_req_data(peer->xseg, vio->mreq, pr);
306 __set_vio_state(vio, MAPPING);
307 p = xseg_submit(peer->xseg, vio->mreq, pr->portno, X_ALLOC);
310 r = xseg_signal(peer->xseg, p);
312 /* since submission is successful, just print a warning message */
313 XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
319 xseg_get_req_data(peer->xseg, vio->mreq, &dummy);
321 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
324 conclude_pr(peer, pr);
328 static int append_to_pending_reqs(struct volume_info *vi, struct peer_req *pr)
330 if (!vi->pending_reqs){
331 //allocate 8 as default. FIXME make it relevant to nr_ops;
332 vi->pending_reqs = allocate_queue(8);
335 if (!vi->pending_reqs){
336 XSEGLOG2(&lc, E, "Cannot allocate pending reqs queue for volume %s",
341 xqindex r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
343 if (doubleup_queue(vi) < 0)
345 r = __xq_append_tail(vi->pending_reqs, (xqindex) pr);
354 static int handle_accepted(struct peerd *peer, struct peer_req *pr,
355 struct xseg_request *req)
357 struct vlmc_io *vio = __get_vlmcio(pr);
358 struct vlmcd *vlmc = __get_vlmcd(peer);
359 char *target = xseg_get_target(peer->xseg, req);
360 struct volume_info *vi = find_volume_len(vlmc, target, req->targetlen);
362 vi = malloc(sizeof(struct volume_info));
365 conclude_pr(peer, pr);
368 strncpy(vi->name, target, req->targetlen);
369 vi->name[req->targetlen] = 0;
371 vi->pending_pr = NULL;
373 vi->pending_reqs = 0;
374 if (insert_volume(vlmc, vi) < 0){
376 conclude_pr(peer, pr);
382 if (vi->flags & VF_VOLUME_FREEZED){
383 if (append_to_pending_reqs(vi, pr) < 0){
385 conclude_pr(peer, pr);
391 return do_accepted_pr(peer, pr);
395 static int mapping_info(struct peerd *peer, struct peer_req *pr)
397 struct vlmc_io *vio = __get_vlmcio(pr);
398 if (vio->mreq->state & XS_FAILED){
399 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
400 (unsigned long)vio->mreq, vio->mreq->op);
404 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, vio->mreq);
405 char *data = xseg_get_data(peer->xseg, pr->req);
406 *(uint64_t *)data = xinfo->size;
408 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
410 conclude_pr(peer, pr);
414 static int mapping_open(struct peerd *peer, struct peer_req *pr)
416 struct vlmc_io *vio = __get_vlmcio(pr);
417 if (vio->mreq->state & XS_FAILED){
418 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
419 (unsigned long)vio->mreq, vio->mreq->op);
422 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
424 conclude_pr(peer, pr);
428 static int mapping_close(struct peerd *peer, struct peer_req *pr)
430 struct vlmcd *vlmc = __get_vlmcd(peer);
431 struct vlmc_io *vio = __get_vlmcio(pr);
432 if (vio->mreq->state & XS_FAILED){
433 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
434 (unsigned long)vio->mreq, vio->mreq->op);
437 char *target = xseg_get_target(peer->xseg, pr->req);
438 struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
440 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
442 conclude_pr(peer, pr);
444 //assert active_reqs == 1
445 //assert volume freezed
448 XSEGLOG2(&lc, E, "Volume has not volume info");
451 vi->flags &= ~ VF_VOLUME_FREEZED;
452 if (!vi->pending_reqs || !xq_count(vi->pending_reqs)){
453 if (vi->pending_reqs)
454 xq_free(vi->pending_reqs);
455 remove_volume(vlmc, vi);
460 while (!(vi->flags & VF_VOLUME_FREEZED) &&
461 (xqi = __xq_pop_head(vi->pending_reqs)) != Noneidx) {
462 struct peer_req *ppr = (struct peer_req *) xqi;
463 do_accepted_pr(peer, ppr);
469 static int mapping_snapshot(struct peerd *peer, struct peer_req *pr)
471 struct vlmcd *vlmc = __get_vlmcd(peer);
472 struct vlmc_io *vio = __get_vlmcio(pr);
473 char *target = xseg_get_target(peer->xseg, pr->req);
474 struct volume_info *vi = find_volume_len(vlmc, target, pr->req->targetlen);
475 if (vio->mreq->state & XS_FAILED){
476 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
477 (unsigned long)vio->mreq, vio->mreq->op);
481 struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, vio->mreq);
482 char buf[XSEG_MAX_TARGETLEN];
483 strncpy(buf, target, pr->req->targetlen);
484 int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, sizeof(struct xseg_reply_snapshot));
486 XSEGLOG2(&lc, E, "Cannot resize request");
490 target = xseg_get_target(peer->xseg, pr->req);
491 strncpy(target, buf, pr->req->targetlen);
492 char *data = xseg_get_data(peer->xseg, pr->req);
493 struct xseg_reply_snapshot *xsnapshot = (struct xseg_reply_snapshot *) data;
494 *xsnapshot = *xreply;
498 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
500 conclude_pr(peer, pr);
502 //assert volume freezed
505 XSEGLOG2(&lc, E, "Volume has no volume info");
509 vi->flags &= ~ VF_VOLUME_FREEZED;
512 while (vi->pending_reqs && !(vi->flags & VF_VOLUME_FREEZED) &&
513 (xqi = __xq_pop_head(vi->pending_reqs) != Noneidx)) {
514 struct peer_req *ppr = (struct peer_req *) xqi;
515 do_accepted_pr(peer, ppr);
520 static int mapping_readwrite(struct peerd *peer, struct peer_req *pr)
522 struct vlmcd *vlmc = __get_vlmcd(peer);
523 struct vlmc_io *vio = __get_vlmcio(pr);
524 struct xseg_reply_map *mreply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, vio->mreq);
525 uint64_t pos, datalen, offset;
527 struct xseg_request *breq;
531 if (vio->mreq->state & XS_FAILED){
532 XSEGLOG2(&lc, E, "req %lx (op: %d) failed",
533 (unsigned long)vio->mreq, vio->mreq->op);
534 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
537 conclude_pr(peer, pr);
541 if (!mreply || !mreply->cnt){
542 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
545 conclude_pr(peer, pr);
549 vio->breq_len = mreply->cnt;
550 vio->breqs = calloc(vio->breq_len, sizeof(struct xseg_request *));
552 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
555 conclude_pr(peer, pr);
560 __set_vio_state(vio, SERVING);
561 for (i = 0; i < vio->breq_len; i++) {
562 datalen = mreply->segs[i].size;
563 offset = mreply->segs[i].offset;
564 targetlen = mreply->segs[i].targetlen;
565 breq = xseg_get_request(peer->xseg, pr->portno, vlmc->bportno, X_ALLOC);
570 r = xseg_prep_request(peer->xseg, breq, targetlen, datalen);
573 xseg_put_request(peer->xseg, breq, pr->portno);
576 breq->offset = offset;
577 breq->size = datalen;
578 breq->op = pr->req->op;
579 target = xseg_get_target(peer->xseg, breq);
582 xseg_put_request(peer->xseg, breq, pr->portno);
585 strncpy(target, mreply->segs[i].target, targetlen);
586 r = xseg_set_req_data(peer->xseg, breq, pr);
589 xseg_put_request(peer->xseg, breq, pr->portno);
593 // this should work, right ?
594 breq->data = pr->req->data + pos;
596 p = xseg_submit(peer->xseg, breq, pr->portno, X_ALLOC);
600 xseg_get_req_data(peer->xseg, breq, &dummy);
601 xseg_put_request(peer->xseg, breq, pr->portno);
604 r = xseg_signal(peer->xseg, p);
606 XSEGLOG2(&lc, W, "Couldnt signal port %u", p);
608 vio->breqs[i] = breq;
611 xseg_put_request(peer->xseg, vio->mreq, pr->portno);
617 conclude_pr(peer, pr);
623 static int handle_mapping(struct peerd *peer, struct peer_req *pr,
624 struct xseg_request *req)
626 struct vlmc_io *vio = __get_vlmcio(pr);
628 //assert vio>mreq == req
629 if (vio->mreq != req){
630 XSEGLOG2(&lc, E ,"vio->mreq %lx, req: %lx state: %d breq[0]: %lx",
631 (unsigned long)vio->mreq, (unsigned long)req,
632 vio->state, (unsigned long)vio->breqs[0]);
636 switch (vio->mreq->op){
638 mapping_info(peer, pr);
641 mapping_snapshot(peer, pr);
644 mapping_close(peer, pr);
647 mapping_open(peer, pr);
651 mapping_readwrite(peer, pr);
654 XSEGLOG2(&lc, W, "Invalid mreq op");
656 //conclude_pr(peer, pr);
663 static int handle_serving(struct peerd *peer, struct peer_req *pr,
664 struct xseg_request *req)
666 struct vlmc_io *vio = __get_vlmcio(pr);
667 struct vlmcd *vlmc = __get_vlmcd(peer);
669 struct xseg_request *breq = req;
671 if (breq->state & XS_FAILED && !(breq->state & XS_SERVED)) {
672 XSEGLOG2(&lc, E, "req %lx (op: %d) failed at offset %llu\n",
673 (unsigned long)req, req->op,
674 (unsigned long long)req->offset);
677 //assert breq->serviced == breq->size
678 pr->req->serviced += breq->serviced;
680 xseg_put_request(peer->xseg, breq, pr->portno);
682 if (!--vio->breq_cnt){
683 __set_vio_state(vio, CONCLUDED);
687 conclude_pr(peer, pr);
692 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
693 enum dispatch_reason reason)
695 struct vlmc_io *vio = __get_vlmcio(pr);
696 struct vlmcd *vlmc = __get_vlmcd(peer);
699 if (reason == dispatch_accept)
700 //assert (pr->req == req)
701 __set_vio_state(vio, ACCEPTED);
703 enum io_state_enum state = __get_vio_state(vio);
706 handle_accepted(peer, pr, req);
709 handle_mapping(peer, pr, req);
712 handle_serving(peer, pr, req);
715 XSEGLOG2(&lc, W, "invalid state. dispatch called for CONCLUDED");
718 XSEGLOG2(&lc, E, "wtf dude? invalid state");
725 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
728 struct vlmcd *vlmc = malloc(sizeof(struct vlmcd));
732 XSEGLOG2(&lc, E, "Cannot alloc vlmc");
735 peer->priv = (void *) vlmc;
737 vlmc->volumes = xhash_new(3, STRING);
739 XSEGLOG2(&lc, E, "Cannot alloc vlmc");
742 vlmc->mportno = NoPort;
743 vlmc->bportno = NoPort;
745 BEGIN_READ_ARGS(argc, argv);
746 READ_ARG_ULONG("-mp", vlmc->mportno);
747 READ_ARG_ULONG("-bp", vlmc->bportno);
750 if (vlmc->bportno == NoPort) {
751 XSEGLOG2(&lc, E, "bportno must be provided");
755 if (vlmc->mportno == NoPort) {
756 XSEGLOG2(&lc, E, "mportno must be provided");
761 for (i = 0; i < peer->nr_ops; i++) {
762 vio = malloc(sizeof(struct vlmc_io));
770 xlock_release(&vio->lock);
771 peer->peer_reqs[i].priv = (void *) vio;
773 if (i < peer->nr_ops) {
774 for (j = 0; j < i; j++) {
775 free(peer->peer_reqs[i].priv);
781 const struct sched_param param = { .sched_priority = 99 };
782 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
787 void custom_peer_finalize(struct peerd *peer)