8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
16 #include <sys/syscall.h>
18 GCRY_THREAD_OPTION_PTHREAD_IMPL;
22 #define SHA256_DIGEST_SIZE 32
23 /* hex representation of sha256 value takes up double the sha256 size */
24 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
26 #define block_size (1<<22) //FIXME this should be defined here?
27 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
28 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value + volume size */
30 #define MF_OBJECT_EXIST (1 << 0)
31 #define MF_OBJECT_COPYING (1 << 1)
32 #define MF_OBJECT_WRITING (1 << 2)
33 #define MF_OBJECT_DELETING (1 << 3)
35 #define MF_OBJECT_NOT_READY (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
36 extern struct log_ctx lc;
38 char *magic_string = "This a magic string. Please hash me";
39 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
40 char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
42 //internal mapper states
55 char object[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
56 struct xq pending; /* pending peer_reqs on this object */
60 #define MF_MAP_LOADING (1 << 0)
61 #define MF_MAP_DESTROYED (1 << 1)
62 #define MF_MAP_WRITING (1 << 2)
63 #define MF_MAP_DELETING (1 << 3)
64 #define MF_MAP_DROPPING_CACHE (1 << 4)
66 #define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
67 MF_MAP_DROPPING_CACHE)
73 char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
74 xhash_t *objects; /* obj_index --> map_node */
75 struct xq pending; /* pending peer_reqs on this map */
79 xport bportno; /* blocker that accesses data */
80 xport mbportno; /* blocker that accesses maps */
81 xhash_t *hashmaps; // hash_function(target) --> struct map
85 volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */
86 xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
87 struct map_node *copyup_node;
88 int err; /* error flag */
91 enum mapper_state state;
94 static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req);
95 void print_map(struct map *m);
101 static inline struct mapperd * __get_mapperd(struct peerd *peer)
103 return (struct mapperd *) peer->priv;
106 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
108 return (struct mapper_io *) pr->priv;
111 static inline uint64_t calc_map_obj(struct map *map)
113 uint64_t nr_objs = map->size / block_size;
114 if (map->size % block_size)
119 static uint32_t calc_nr_obj(struct xseg_request *req)
122 uint64_t rem_size = req->size;
123 uint64_t obj_offset = req->offset & (block_size -1); //modulo
124 uint64_t obj_size = (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
125 rem_size -= obj_size;
126 while (rem_size > 0) {
127 obj_size = (rem_size > block_size) ? block_size : rem_size;
128 rem_size -= obj_size;
136 * Maps handling functions
139 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
142 struct map *m = NULL;
143 char buf[XSEG_MAX_TARGETLEN+1];
144 //assert targetlen <= XSEG_MAX_TARGETLEN
145 strncpy(buf, target, targetlen);
147 XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
148 r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
155 static int insert_map(struct mapperd *mapper, struct map *map)
159 if (find_map(mapper, map->volume, map->volumelen)){
160 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
164 XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)",
165 map->volume, strlen(map->volume), (unsigned long) map);
166 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
167 while (r == -XHASH_ERESIZE) {
168 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
169 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
171 XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
172 (unsigned long long) shift);
175 mapper->hashmaps = new_hashmap;
176 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
182 static int remove_map(struct mapperd *mapper, struct map *map)
186 //assert no pending pr on map
188 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
189 while (r == -XHASH_ERESIZE) {
190 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
191 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
193 XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
194 (unsigned long long) shift);
197 mapper->hashmaps = new_hashmap;
198 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
205 static int load_map(struct peerd *peer, struct peer_req *pr, char *target,
210 struct xseg_request *req;
211 struct mapperd *mapper = __get_mapperd(peer);
214 struct map *m = find_map(mapper, target, targetlen);
216 m = malloc(sizeof(struct map));
218 XSEGLOG2(&lc, E, "Cannot allocate map ");
222 strncpy(m->volume, target, targetlen);
223 m->volume[targetlen] = 0;
224 m->volumelen = targetlen;
225 m->flags = MF_MAP_LOADING;
226 xqindex *qidx = xq_alloc_empty(&m->pending, peer->nr_ops);
228 XSEGLOG2(&lc, E, "Cannot allocate pending queue for map %s",
232 m->objects = xhash_new(3, INTEGER);
234 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
238 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME err check
243 r = insert_map(mapper, m);
248 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
250 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
255 r = xseg_prep_request(peer->xseg, req, targetlen, block_size);
257 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
262 char *reqtarget = xseg_get_target(peer->xseg, req);
265 strncpy(reqtarget, target, req->targetlen);
267 req->size = block_size;
269 r = xseg_set_req_data(peer->xseg, req, pr);
271 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
275 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
277 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
281 r = xseg_signal(peer->xseg, p);
283 XSEGLOG2(&lc, I, "Map %s loading", m->volume);
287 xseg_get_req_data(peer->xseg, req, &dummy);
289 xseg_put_request(peer->xseg, req, pr->portno);
292 remove_map(mapper, m);
294 while((idx = __xq_pop_head(&m->pending)) != Noneidx) {
295 fail(peer, (struct peer_req *) idx);
299 xhash_free(m->objects);
301 xq_free(&m->pending);
303 XSEGLOG2(&lc, E, "failed to load map %s", m->volume);
309 //assert map loading when this is reached
310 if (m->flags & MF_MAP_LOADING) {
311 XSEGLOG2(&lc, I, "Map %s already exists and loading. "
312 "Adding to pending queue", m->volume);
313 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME errcheck
316 XSEGLOG2(&lc, I, "Map %s already exists and loaded. Dispatching.", m->volume);
317 my_dispatch(peer, pr, pr->req);
323 static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
324 char *target, uint32_t targetlen, struct map **m)
326 struct mapperd *mapper = __get_mapperd(peer);
328 *m = find_map(mapper, target, targetlen);
330 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
331 if ((*m)->flags & MF_MAP_NOT_READY) {
332 __xq_append_tail(&(*m)->pending, (xqindex) pr);
333 XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
335 //} else if ((*m)->flags & MF_MAP_DESTROYED){
339 XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
343 r = load_map(peer, pr, target, targetlen);
350 * Object handling functions
353 struct map_node *find_object(struct map *map, uint64_t obj_index)
356 int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
362 static int insert_object(struct map *map, struct map_node *mn)
364 //FIXME no find object first
365 int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
366 if (r == -XHASH_ERESIZE) {
367 unsigned long shift = xhash_grow_size_shift(map->objects);
368 map->objects = xhash_resize(map->objects, shift, NULL);
371 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
378 * map read/write functions
380 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
383 //hexlify sha256 value
384 for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
385 sprintf(mn->object+2*i, "%02x", buf[i]);
388 mn->object[SHA256_DIGEST_SIZE * 2] = 0;
389 mn->objectlen = SHA256_DIGEST_SIZE * 2;
390 mn->flags = MF_OBJECT_EXIST;
393 static inline void map_to_object(struct map_node *mn, char *buf)
398 mn->flags |= MF_OBJECT_EXIST;
399 memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
400 mn->object[XSEG_MAX_TARGETLEN] = 0;
401 mn->objectlen = strlen(mn->object);
404 static inline void object_to_map(char* buf, struct map_node *mn)
406 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
407 memcpy(buf+1, mn->object, mn->objectlen);
408 memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
411 static inline void mapheader_to_map(struct map *m, char *buf)
414 memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
415 pos += SHA256_DIGEST_SIZE;
416 memcpy(buf + pos, &m->size, sizeof(m->size));
417 pos += sizeof(m->size);
421 static int object_write(struct peerd *peer, struct peer_req *pr,
422 struct map *map, struct map_node *mn)
425 struct mapperd *mapper = __get_mapperd(peer);
426 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
427 mapper->mbportno, X_ALLOC);
429 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
431 mn->object, map->volume, (unsigned long long) mn->objectidx);
434 int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
436 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
438 mn->object, map->volume, (unsigned long long) mn->objectidx);
441 char *target = xseg_get_target(peer->xseg, req);
442 strncpy(target, map->volume, req->targetlen);
443 req->size = objectsize_in_map;
444 req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
446 char *data = xseg_get_data(peer->xseg, req);
447 object_to_map(data, mn);
449 r = xseg_set_req_data(peer->xseg, req, pr);
451 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
453 mn->object, map->volume, (unsigned long long) mn->objectidx);
456 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
458 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
460 mn->object, map->volume, (unsigned long long) mn->objectidx);
463 r = xseg_signal(peer->xseg, p);
465 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
467 XSEGLOG2(&lc, I, "Writing object %s \n\t"
469 mn->object, map->volume, (unsigned long long) mn->objectidx);
474 xseg_get_req_data(peer->xseg, req, &dummy);
476 xseg_put_request(peer->xseg, req, pr->portno);
478 XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
480 mn->object, map->volume, (unsigned long long) mn->objectidx);
484 static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
487 struct mapperd *mapper = __get_mapperd(peer);
489 uint64_t i, pos, max_objidx = calc_map_obj(map);
490 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
491 mapper->mbportno, X_ALLOC);
493 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
496 int r = xseg_prep_request(peer->xseg, req, map->volumelen,
497 mapheader_size + max_objidx * objectsize_in_map);
499 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
502 char *target = xseg_get_target(peer->xseg, req);
503 strncpy(target, map->volume, req->targetlen);
504 char *data = xseg_get_data(peer->xseg, req);
505 mapheader_to_map(map, data);
506 pos = mapheader_size;
508 req->size = req->datalen;
511 if (map->size % block_size)
513 for (i = 0; i < max_objidx; i++) {
514 mn = find_object(map, i);
516 XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
517 (unsigned long long) i, map->volume);
520 object_to_map(data+pos, mn);
521 pos += objectsize_in_map;
523 r = xseg_set_req_data(peer->xseg, req, pr);
525 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
529 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
531 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
535 r = xseg_signal(peer->xseg, p);
537 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
539 map->flags |= MF_MAP_WRITING;
540 XSEGLOG2(&lc, I, "Writing map %s", map->volume);
544 xseg_get_req_data(peer->xseg, req, &dummy);
546 xseg_put_request(peer->xseg, req, pr->portno);
548 XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
552 static int read_map (struct peerd *peer, struct map *map, char *buf)
554 char nulls[SHA256_DIGEST_SIZE];
555 memset(nulls, 0, SHA256_DIGEST_SIZE);
557 int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
562 //type 1, our type, type 0 pithos map
563 int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
564 XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
567 struct map_node *map_node;
569 pos = SHA256_DIGEST_SIZE;
570 map->size = *(uint64_t *) (buf + pos);
571 pos += sizeof(uint64_t);
572 nr_objs = map->size / block_size;
573 if (map->size % block_size)
575 map_node = calloc(nr_objs, sizeof(struct map_node));
579 for (i = 0; i < nr_objs; i++) {
580 map_node[i].map = map;
581 map_node[i].objectidx = i;
582 xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
584 map_to_object(&map_node[i], buf + pos);
585 pos += objectsize_in_map;
586 r = insert_object(map, &map_node[i]); //FIXME error check
590 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
591 map_node = calloc(max_nr_objs, sizeof(struct map_node));
594 for (i = 0; i < max_nr_objs; i++) {
595 if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
597 map_node[i].objectidx = i;
598 map_node[i].map = map;
599 xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
601 pithosmap_to_object(&map_node[i], buf + pos);
602 pos += SHA256_DIGEST_SIZE;
603 r = insert_object(map, &map_node[i]); //FIXME error check
605 map->size = i * block_size;
607 XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
610 //FIXME cleanup on error
617 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
622 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
623 if (r == -XHASH_ERESIZE) {
624 xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
625 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
628 mio->copyups_nodes = new_hashmap;
629 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
633 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
634 if (r == -XHASH_ERESIZE) {
635 xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
636 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
639 mio->copyups_nodes = new_hashmap;
640 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
645 mio->copyup_node = mn;
649 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
653 int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
658 return mio->copyup_node;
661 static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
663 struct mapperd *mapper = __get_mapperd(peer);
664 struct mapper_io *mio = __get_mapper_io(pr);
665 struct map *map = mn->map;
670 //struct sha256_ctx sha256ctx;
671 uint32_t newtargetlen;
672 char new_target[XSEG_MAX_TARGETLEN + 1];
673 unsigned char buf[SHA256_DIGEST_SIZE]; //assert sha256_digest_size(32) <= MAXTARGETLEN
674 char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
675 strncpy(new_object, map->volume, map->volumelen);
676 sprintf(new_object + map->volumelen, "%u", mn->objectidx); //sprintf adds null termination
677 new_object[XSEG_MAX_TARGETLEN + 19] = 0;
679 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
680 for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
681 sprintf (new_target + 2*i, "%02x", buf[i]);
682 newtargetlen = SHA256_DIGEST_SIZE * 2;
684 if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE))
685 goto copyup_zeroblock;
687 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
688 mapper->bportno, X_ALLOC);
691 r = xseg_prep_request(peer->xseg, req, newtargetlen,
692 sizeof(struct xseg_request_copy));
696 char *target = xseg_get_target(peer->xseg, req);
697 strncpy(target, new_target, req->targetlen);
699 struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
700 strncpy(xcopy->target, mn->object, mn->objectlen);
701 xcopy->targetlen = mn->objectlen;
704 req->size = block_size;
706 r = xseg_set_req_data(peer->xseg, req, pr);
709 r = __set_copyup_node(mio, req, mn);
710 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
714 xseg_signal(peer->xseg, p);
717 mn->flags |= MF_OBJECT_COPYING;
718 XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
722 r = __set_copyup_node(mio, req, NULL);
723 xseg_get_req_data(peer->xseg, req, &dummy);
725 xseg_put_request(peer->xseg, req, pr->portno);
727 XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
731 XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
732 "Proceeding in writing the new object in map");
733 /* construct a tmp map_node for writing purposes */
734 struct map_node newmn = *mn;
735 newmn.flags = MF_OBJECT_EXIST;
736 strncpy(newmn.object, new_target, newtargetlen);
737 newmn.object[newtargetlen] = 0;
738 newmn.objectlen = newtargetlen;
739 newmn.objectidx = mn->objectidx;
740 r = __set_copyup_node(mio, req, mn);
741 r = object_write(peer, pr, map, &newmn);
742 if (r != MF_PENDING){
743 XSEGLOG2(&lc, E, "Object write returned error for object %s"
744 "\n\t of map %s [%llu]",
745 mn->object, map->volume, (unsigned long long) mn->objectidx);
748 mn->flags |= MF_OBJECT_WRITING;
749 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
754 * request handling functions
757 static int handle_mapread(struct peerd *peer, struct peer_req *pr,
758 struct xseg_request *req)
762 char buf[XSEG_MAX_TARGETLEN];
763 struct mapperd *mapper = __get_mapperd(peer);
764 //assert req->op = X_READ;
765 char *target = xseg_get_target(peer->xseg, req);
766 struct map *map = find_map(mapper, target, req->targetlen);
769 //assert map->flags & MF_MAP_LOADING
771 if (req->state & XS_FAILED)
774 char *data = xseg_get_data(peer->xseg, req);
775 r = read_map(peer, map, data);
779 xseg_put_request(peer->xseg, req, pr->portno);
780 map->flags &= ~MF_MAP_LOADING;
781 XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
782 uint64_t qsize = xq_count(&map->pending);
783 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
785 struct peer_req *preq = (struct peer_req *) idx;
786 my_dispatch(peer, preq, preq->req);
791 XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
792 xseg_put_request(peer->xseg, req, pr->portno);
793 map->flags &= ~MF_MAP_LOADING;
794 while((idx = __xq_pop_head(&map->pending)) != Noneidx){
795 struct peer_req *preq = (struct peer_req *) idx;
798 remove_map(mapper, map);
799 //FIXME not freeing up all objects + object hash
804 strncpy(buf, target, req->targetlen);
805 buf[req->targetlen] = 0;
806 XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
807 xseg_put_request(peer->xseg, req, pr->portno);
811 static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
812 struct xseg_request *req)
815 char buf[XSEG_MAX_TARGETLEN];
816 struct mapperd *mapper = __get_mapperd(peer);
817 //assert req->op = X_WRITE;
818 char *target = xseg_get_target(peer->xseg, req);
819 struct map *map = find_map(mapper, target, req->targetlen);
821 fprintf(stderr, "couldn't find map\n");
824 //assert map->flags & MF_MAP_WRITING
826 if (req->state & XS_FAILED){
827 fprintf(stderr, "write request failed\n");
831 xseg_put_request(peer->xseg, req, pr->portno);
832 map->flags &= ~MF_MAP_WRITING;
833 XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
834 uint64_t qsize = xq_count(&map->pending);
835 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
837 struct peer_req *preq = (struct peer_req *) idx;
838 my_dispatch(peer, preq, preq->req);
844 XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
845 xseg_put_request(peer->xseg, req, pr->portno);
846 map->flags &= ~MF_MAP_WRITING;
847 while((idx = __xq_pop_head(&map->pending)) != Noneidx){
848 struct peer_req *preq = (struct peer_req *) idx;
851 remove_map(mapper, map);
852 //FIXME not freeing up all objects + object hash
857 strncpy(buf, target, req->targetlen);
858 buf[req->targetlen] = 0;
859 XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
860 xseg_put_request(peer->xseg, req, pr->portno);
864 static int handle_clone(struct peerd *peer, struct peer_req *pr,
865 struct xseg_request *req)
867 struct mapperd *mapper = __get_mapperd(peer);
868 struct mapper_io *mio = __get_mapper_io(pr);
871 char buf[XSEG_MAX_TARGETLEN + 1];
874 if (pr->req->op != X_CLONE) {
876 XSEGLOG2(&lc, E, "Unknown op %u", req->op);
881 if (req->op == X_WRITE){
882 //assert state = WRITING;
883 r = handle_mapwrite(peer, pr ,req);
885 XSEGLOG2(&lc, E, "handle mapwrite returned error");
891 if (mio->state == WRITING) {
892 target = xseg_get_target(peer->xseg, pr->req);
893 strncpy(buf, target, req->targetlen);
894 buf[req->targetlen] = 0;
895 XSEGLOG2(&lc, I, "Completing clone request for map %s", buf);
900 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
905 r = find_or_load_map(peer, pr, xclone->target, xclone->targetlen, &map);
909 else if (r == MF_PENDING)
912 if (map->flags & MF_MAP_DESTROYED) {
913 strncpy(buf, xclone->target, xclone->targetlen);
914 buf[xclone->targetlen] = 0;
915 XSEGLOG2(&lc, W, "Map %s destroyed", buf);
916 target = xseg_get_target(peer->xseg, pr->req);
917 strncpy(buf, target, req->targetlen);
918 buf[req->targetlen] = 0;
919 XSEGLOG2(&lc, W, "Cannont clone %s because base map destroyed", buf);
924 struct map *clonemap = malloc(sizeof(struct map));
929 FIXME check if clone map exists
930 find_or_load_map(peer, pr, target, req->targetlen, &clonemap)
931 ... (on destroyed what ??
933 target = xseg_get_target(peer->xseg, pr->req);
934 strncpy(buf, target, req->targetlen);
935 buf[req->targetlen] = 0;
936 XSEGLOG2(&lc, W, "Map %s requested for clone exists", buf);
941 //alloc and init struct map
942 clonemap->objects = xhash_new(3, INTEGER);
943 if (!clonemap->objects){
944 goto out_err_clonemap;
946 xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops);
948 goto out_err_objhash;
950 if (xclone->size < map->size) {
951 target = xseg_get_target(peer->xseg, pr->req);
952 strncpy(buf, target, req->targetlen);
953 buf[req->targetlen] = 0;
954 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
955 "\n\t for requested clone %s",
956 (unsigned long long) xclone->size,
957 (unsigned long long) map->size, buf);
960 if (xclone->size == -1)
961 clonemap->size = map->size;
963 clonemap->size = xclone->size;
965 target = xseg_get_target(peer->xseg, pr->req);
966 strncpy(clonemap->volume, target, pr->req->targetlen);
967 clonemap->volumelen = pr->req->targetlen;
968 clonemap->volume[clonemap->volumelen] = 0; //NULL TERMINATE
970 //alloc and init map_nodes
971 unsigned long c = clonemap->size/block_size + 1;
972 struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
977 for (i = 0; i < clonemap->size/block_size + 1; i++) {
978 struct map_node *mn = find_object(map, i);
980 strncpy(map_nodes[i].object, mn->object, mn->objectlen);
981 map_nodes[i].objectlen = mn->objectlen;
983 strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2 ?
984 map_nodes[i].objectlen = strlen(zero_block);
986 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
987 map_nodes[i].flags = 0;
988 map_nodes[i].objectidx = i;
989 map_nodes[i].map = clonemap;
990 xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops);
991 r = insert_object(clonemap, &map_nodes[i]);
997 r = insert_map(mapper, clonemap);
999 XSEGLOG2(&lc, E, "Cannot insert map %s", clonemap->volume);
1002 r = map_write(peer, pr, clonemap);
1004 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1007 else if (r == MF_PENDING) {
1008 //maybe move this to map_write
1009 XSEGLOG2(&lc, I, "Writing map %s", clonemap->volume);
1010 __xq_append_tail(&clonemap->pending, (xqindex) pr);
1011 mio->state = WRITING;
1015 XSEGLOG2(&lc, I, "Map write for map %s returned unknown value", clonemap->volume);
1022 remove_map(mapper, clonemap);
1024 //FIXME not freeing allocated queues of map_nodes
1027 xq_free(&clonemap->pending);
1029 xhash_free(clonemap->objects);
1033 target = xseg_get_target(peer->xseg, pr->req);
1034 strncpy(buf, target, req->targetlen);
1035 buf[req->targetlen] = 0;
1036 XSEGLOG2(&lc, E, "Clone map for %s failed", buf);
1041 static int req2objs(struct peerd *peer, struct peer_req *pr,
1042 struct map *map, int write)
1044 char *target = xseg_get_target(peer->xseg, pr->req);
1045 uint32_t nr_objs = calc_nr_obj(pr->req);
1046 uint64_t size = sizeof(struct xseg_reply_map) +
1047 nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1049 XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1050 /* resize request to fit reply */
1051 char buf[XSEG_MAX_TARGETLEN];
1052 strncpy(buf, target, pr->req->targetlen);
1053 int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1055 XSEGLOG2(&lc, E, "Cannot resize request");
1058 target = xseg_get_target(peer->xseg, pr->req);
1059 strncpy(target, buf, pr->req->targetlen);
1061 /* structure reply */
1062 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1063 reply->cnt = nr_objs;
1066 uint64_t rem_size = pr->req->size;
1067 uint64_t obj_index = pr->req->offset / block_size;
1068 uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo
1069 uint64_t obj_size = (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1070 struct map_node * mn = find_object(map, obj_index);
1072 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1075 if (write && (mn->flags & MF_OBJECT_NOT_READY))
1076 goto out_object_copying;
1077 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1078 //calc new_target, copy up object
1079 r = copyup_object(peer, mn, pr);
1081 XSEGLOG2(&lc, E, "Error in copy up object");
1084 goto out_object_copying;
1087 // XSEGLOG2(&lc, D, "pr->req->offset: %llu, pr->req->size %llu, block_size %u\n",
1088 // (unsigned long long) pr->req->offset,
1089 // (unsigned long long) pr->req->size,
1091 strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1092 reply->segs[idx].targetlen = mn->objectlen;
1093 reply->segs[idx].offset = obj_offset;
1094 reply->segs[idx].size = obj_size;
1095 // XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
1096 // (unsigned long long) reply->segs[idx].size,
1097 // (unsigned long long) reply->segs[idx].offset);
1098 rem_size -= obj_size;
1099 while (rem_size > 0) {
1103 obj_size = (rem_size > block_size) ? block_size : rem_size;
1104 rem_size -= obj_size;
1105 mn = find_object(map, obj_index);
1107 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1110 if (write && (mn->flags & MF_OBJECT_NOT_READY))
1111 goto out_object_copying;
1112 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1113 //calc new_target, copy up object
1114 r = copyup_object(peer, mn, pr);
1116 XSEGLOG2(&lc, E, "Error in copy up object");
1119 goto out_object_copying;
1121 strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1122 reply->segs[idx].targetlen = mn->objectlen;
1123 reply->segs[idx].offset = obj_offset;
1124 reply->segs[idx].size = obj_size;
1125 // XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
1126 // (unsigned long long) reply->segs[idx].size,
1127 // (unsigned long long) reply->segs[idx].offset);
1129 if (reply->cnt != (idx + 1)){
1130 XSEGLOG2(&lc, E, "reply->cnt %u, idx+1: %u", reply->cnt, idx+1);
1137 //printf("r2o mn: %lx\n", mn);
1138 //printf("volume %s pending on %s\n", map->volume, mn->object);
1140 if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx)
1141 XSEGLOG2(&lc, E, "Cannot append pr to tail");
1142 XSEGLOG2(&lc, I, "object %s is pending \n\t idx:%llu of map %s",
1143 mn->object, (unsigned long long) mn->objectidx, map->volume);
1151 static int handle_mapr(struct peerd *peer, struct peer_req *pr,
1152 struct xseg_request *req)
1154 struct mapperd *mapper = __get_mapperd(peer);
1155 struct mapper_io *mio = __get_mapper_io(pr);
1159 char *target = xseg_get_target(peer->xseg, pr->req);
1161 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1166 else if (r == MF_PENDING)
1169 if (map->flags & MF_MAP_DESTROYED) {
1175 r = req2objs(peer, pr, map, 0);
1177 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1179 (unsigned long long) pr->req->offset,
1180 (unsigned long long) (pr->req->offset + pr->req->size));
1184 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1186 (unsigned long long) pr->req->offset,
1187 (unsigned long long) (pr->req->offset + pr->req->size));
1188 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1189 (unsigned long long) req->offset,
1190 (unsigned long long) req->size);
1191 char buf[XSEG_MAX_TARGETLEN+1];
1192 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1194 for (i = 0; i < reply->cnt; i++) {
1195 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1196 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1197 buf[reply->segs[i].targetlen] = 0;
1198 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1199 (unsigned long long) reply->segs[i].offset,
1200 (unsigned long long) reply->segs[i].size);
1209 static int handle_copyup(struct peerd *peer, struct peer_req *pr,
1210 struct xseg_request *req)
1212 struct mapperd *mapper = __get_mapperd(peer);
1214 struct mapper_io *mio = __get_mapper_io(pr);
1217 struct map_node *mn = __get_copyup_node(mio, req);
1221 mn->flags &= ~MF_OBJECT_COPYING;
1222 if (req->state & XS_FAILED && !(req->state & XS_SERVED)){
1223 XSEGLOG2(&lc, E, "Copy up of object %s failed", mn->object);
1226 struct map *map = mn->map;
1228 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1232 /* construct a tmp map_node for writing purposes */
1233 char *target = xseg_get_target(peer->xseg, req);
1234 struct map_node newmn = *mn;
1235 newmn.flags = MF_OBJECT_EXIST;
1236 strncpy(newmn.object, target, req->targetlen);
1237 newmn.object[req->targetlen] = 0;
1238 newmn.objectlen = req->targetlen;
1239 newmn.objectidx = mn->objectidx;
1240 r = object_write(peer, pr, map, &newmn);
1241 if (r != MF_PENDING){
1242 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1243 "\n\t of map %s [%llu]",
1244 mn->object, map->volume, (unsigned long long) mn->objectidx);
1247 mn->flags |= MF_OBJECT_WRITING;
1248 xseg_put_request(peer->xseg, req, pr->portno);
1249 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1253 xseg_put_request(peer->xseg, req, pr->portno);
1254 __set_copyup_node(mio, req, NULL);
1255 while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1256 struct peer_req * preq = (struct peer_req *) idx;
1262 XSEGLOG2(&lc, E, "Cannot get map node");
1266 static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
1267 struct xseg_request *req)
1270 struct mapperd *mapper = __get_mapperd(peer);
1271 struct mapper_io *mio = __get_mapper_io(pr);
1272 //assert req->op = X_WRITE;
1273 char *target = xseg_get_target(peer->xseg, req);
1276 //printf("handle object write replyi\n");
1277 struct map_node *mn = __get_copyup_node(mio, req);
1281 __set_copyup_node(mio, req, NULL);
1283 //assert mn->flags & MF_OBJECT_WRITING
1284 mn->flags &= ~MF_OBJECT_WRITING;
1285 if (req->state & XS_FAILED)
1288 struct map_node tmp;
1289 char *data = xseg_get_data(peer->xseg, req);
1290 map_to_object(&tmp, data);
1291 mn->flags |= MF_OBJECT_EXIST;
1292 if (mn->flags != MF_OBJECT_EXIST){
1293 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1296 //assert mn->flags & MF_OBJECT_EXIST
1297 strncpy(mn->object, tmp.object, tmp.objectlen);
1298 mn->object[tmp.objectlen] = 0;
1299 mn->objectlen = tmp.objectlen;
1300 xseg_put_request(peer->xseg, req, pr->portno);
1302 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1303 uint64_t qsize = xq_count(&mn->pending);
1304 while(qsize > 0 && (idx = __xq_pop_head(&mn->pending)) != Noneidx){
1306 struct peer_req * preq = (struct peer_req *) idx;
1307 my_dispatch(peer, preq, preq->req);
1312 XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
1313 xseg_put_request(peer->xseg, req, pr->portno);
1314 while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1315 struct peer_req *preq = (struct peer_req *) idx;
1321 XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
1322 xseg_put_request(peer->xseg, req, pr->portno);
1326 static int handle_mapw(struct peerd *peer, struct peer_req *pr,
1327 struct xseg_request *req)
1329 struct mapperd *mapper = __get_mapperd(peer);
1330 struct mapper_io *mio = __get_mapper_io(pr);
1333 /* handle copy up replies separately */
1334 if (req->op == X_COPY){
1335 if (handle_copyup(peer, pr, req) < 0){
1336 XSEGLOG2(&lc, E, "Handle copy up returned error");
1343 else if(req->op == X_WRITE){
1344 /* handle replies of object write operations */
1345 if (handle_objectwrite(peer, pr, req) < 0) {
1346 XSEGLOG2(&lc, E, "Handle object write returned error");
1354 char *target = xseg_get_target(peer->xseg, pr->req);
1356 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1361 else if (r == MF_PENDING)
1364 if (map->flags & MF_MAP_DESTROYED) {
1369 r = req2objs(peer, pr, map, 1);
1371 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1373 (unsigned long long) pr->req->offset,
1374 (unsigned long long) (pr->req->offset + pr->req->size));
1378 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1380 (unsigned long long) pr->req->offset,
1381 (unsigned long long) (pr->req->offset + pr->req->size));
1382 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1383 (unsigned long long) req->offset,
1384 (unsigned long long) req->size);
1385 char buf[XSEG_MAX_TARGETLEN+1];
1386 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1388 for (i = 0; i < reply->cnt; i++) {
1389 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1390 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1391 buf[reply->segs[i].targetlen] = 0;
1392 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1393 (unsigned long long) reply->segs[i].offset,
1394 (unsigned long long) reply->segs[i].size);
1398 //else copyup pending, wait for pr restart
1403 static int handle_snap(struct peerd *peer, struct peer_req *pr,
1404 struct xseg_request *req)
1410 static int handle_info(struct peerd *peer, struct peer_req *pr,
1411 struct xseg_request *req)
1413 struct mapperd *mapper = __get_mapperd(peer);
1414 struct mapper_io *mio = __get_mapper_io(pr);
1417 char *target = xseg_get_target(peer->xseg, pr->req);
1422 //printf("Handle info\n");
1424 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1429 else if (r == MF_PENDING)
1431 if (map->flags & MF_MAP_DESTROYED) {
1436 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1437 xinfo->size = map->size;
1443 static int delete_object(struct peerd *peer, struct peer_req *pr,
1444 struct map_node *mn)
1447 struct mapperd *mapper = __get_mapperd(peer);
1448 struct mapper_io *mio = __get_mapper_io(pr);
1450 mio->delobj = mn->objectidx;
1451 if (xq_count(&mn->pending) != 0) {
1452 __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1453 XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1458 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1459 mapper->bportno, X_ALLOC);
1462 int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1465 char *target = xseg_get_target(peer->xseg, req);
1466 strncpy(target, mn->object, req->targetlen);
1468 req->size = req->datalen;
1471 r = xseg_set_req_data(peer->xseg, req, pr);
1474 __set_copyup_node(mio, req, mn);
1475 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1478 r = xseg_signal(peer->xseg, p);
1479 mn->flags |= MF_OBJECT_DELETING;
1480 XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1484 xseg_get_req_data(peer->xseg, req, &dummy);
1486 xseg_put_request(peer->xseg, req, pr->portno);
1488 XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1493 * Find next object for deletion. Start searching on idx mio->delobj.
1494 * Skip non existing map_nodes, free_resources and skip non-existing objects
1495 * Wait for all pending operations on the object, before moving forward to the
1498 * Return MF_PENDING if theres is a pending operation on the next object
1499 * or zero if there is no next object
1501 static int delete_next_object(struct peerd *peer, struct peer_req *pr,
1504 struct mapperd *mapper = __get_mapperd(peer);
1505 struct mapper_io *mio = __get_mapper_io(pr);
1506 uint64_t idx = mio->delobj;
1507 struct map_node *mn;
1510 while (idx < calc_map_obj(map)) {
1511 mn = find_object(map, idx);
1517 if (xq_count(&mn->pending) != 0) {
1518 __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1519 XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1523 if (mn->flags & MF_OBJECT_EXIST){
1524 r = delete_object(peer, pr, mn);
1526 /* on error, just log it, release resources and
1527 * proceed to the next object
1529 XSEGLOG2(&lc, E, "Object %s delete object return error"
1530 "\n\t Map: %s [%llu]",
1531 mn->object, mn->map->volume,
1532 (unsigned long long) mn->objectidx);
1533 xq_free(&mn->pending);
1535 else if (r == MF_PENDING){
1539 xq_free(&mn->pending);
1546 static int handle_object_delete(struct peerd *peer, struct peer_req *pr,
1547 struct map_node *mn, int err)
1549 struct mapperd *mapper = __get_mapperd(peer);
1550 struct mapper_io *mio = __get_mapper_io(pr);
1552 struct map *map = mn->map;
1555 //if object deletion failed, map deletion must continue
1556 //and report OK, since map block has been deleted succesfully
1557 //so, no check for err
1559 //assert object flags OK
1560 //free map_node_resources
1561 mn->flags &= ~MF_OBJECT_DELETING;
1562 xq_free(&mn->pending);
1565 r = delete_next_object(peer, pr, map);
1566 if (r != MF_PENDING){
1567 /* if there is no next object to delete, remove the map block
1571 //assert map flags OK
1572 map->flags |= MF_MAP_DESTROYED;
1573 XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1574 //make all pending requests on map to fail
1575 uint64_t qsize = xq_count(&map->pending);
1576 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
1578 struct peer_req * preq = (struct peer_req *) idx;
1579 my_dispatch(peer, preq, preq->req);
1581 //free map resources;
1582 remove_map(mapper, map);
1583 mn = find_object(map, 0);
1585 xq_free(&map->pending);
1588 XSEGLOG2(&lc, I, "Handle object delete OK");
1592 static int delete_map(struct peerd *peer, struct peer_req *pr,
1596 struct mapperd *mapper = __get_mapperd(peer);
1597 struct mapper_io *mio = __get_mapper_io(pr);
1598 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1599 mapper->mbportno, X_ALLOC);
1602 int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1605 char *target = xseg_get_target(peer->xseg, req);
1606 strncpy(target, map->volume, req->targetlen);
1608 req->size = req->datalen;
1611 r = xseg_set_req_data(peer->xseg, req, pr);
1614 __set_copyup_node(mio, req, NULL);
1615 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1618 r = xseg_signal(peer->xseg, p);
1619 map->flags |= MF_MAP_DELETING;
1620 XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1624 xseg_get_req_data(peer->xseg, req, &dummy);
1626 xseg_put_request(peer->xseg, req, pr->portno);
1628 XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
1632 static int handle_map_delete(struct peerd *peer, struct peer_req *pr,
1633 struct map *map, int err)
1635 struct mapperd *mapper = __get_mapperd(peer);
1636 struct mapper_io *mio = __get_mapper_io(pr);
1640 map->flags &= ~MF_MAP_DELETING;
1642 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1643 //dispatch all pending
1644 while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1645 struct peer_req * preq = (struct peer_req *) idx;
1646 my_dispatch(peer, preq, preq->req);
1649 map->flags |= MF_MAP_DESTROYED;
1650 //delete all objects
1651 XSEGLOG2(&lc, I, "Map %s map block deleted. Deleting objects", map->volume);
1653 r = delete_next_object(peer, pr, map);
1654 if (r != MF_PENDING){
1655 /* if there is no next object to delete, remove the map block
1658 //assert map flags OK
1659 map->flags |= MF_MAP_DESTROYED;
1660 XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1661 //make all pending requests on map to fail
1662 uint64_t qsize = xq_count(&map->pending);
1663 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
1665 struct peer_req * preq = (struct peer_req *) idx;
1666 my_dispatch(peer, preq, preq->req);
1668 //free map resources;
1669 remove_map(mapper, map);
1670 struct map_node *mn = find_object(map, 0);
1673 xq_free(&map->pending);
1680 static int handle_delete(struct peerd *peer, struct peer_req *pr,
1681 struct xseg_request *req)
1683 struct mapperd *mapper = __get_mapperd(peer);
1684 struct mapper_io *mio = __get_mapper_io(pr);
1685 struct map_node *mn;
1688 if (req->state & XS_FAILED && !(req->state &XS_SERVED))
1691 mn = __get_copyup_node(mio, req);
1692 __set_copyup_node(mio, req, NULL);
1693 char *target = xseg_get_target(peer->xseg, req);
1696 map = find_map(mapper, target, req->targetlen);
1698 xseg_put_request(peer->xseg, req, pr->portno);
1701 handle_map_delete(peer, pr, map, err);
1706 xseg_put_request(peer->xseg, req, pr->portno);
1709 handle_object_delete(peer, pr, mn, err);
1711 xseg_put_request(peer->xseg, req, pr->portno);
1715 static int handle_destroy(struct peerd *peer, struct peer_req *pr,
1716 struct xseg_request *req)
1718 struct mapperd *mapper = __get_mapperd(peer);
1719 struct mapper_io *mio = __get_mapper_io(pr);
1722 char buf[XSEG_MAX_TARGETLEN+1];
1723 char *target = xseg_get_target(peer->xseg, pr->req);
1725 strncpy(buf, target, pr->req->targetlen);
1726 buf[req->targetlen] = 0;
1728 XSEGLOG2(&lc, D, "Handle destroy pr: %lx, pr->req: %lx, req: %lx",
1729 (unsigned long) pr, (unsigned long) pr->req,
1730 (unsigned long) req);
1731 XSEGLOG2(&lc, D, "target: %s (%u)", buf, strlen(buf));
1732 if (pr->req != req && req->op == X_DELETE) {
1733 //assert mio->state == DELETING
1734 r = handle_delete(peer, pr, req);
1736 XSEGLOG2(&lc, E, "Handle delete returned error");
1745 r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1750 else if (r == MF_PENDING)
1752 if (map->flags & MF_MAP_DESTROYED) {
1753 if (mio->state == DELETING){
1754 XSEGLOG2(&lc, I, "Map %s destroyed", map->volume);
1758 XSEGLOG2(&lc, I, "Map %s already destroyed", map->volume);
1763 if (mio->state == DELETING) {
1764 //continue deleting map objects;
1765 r = delete_next_object(peer ,pr, map);
1766 if (r != MF_PENDING){
1772 r = delete_map(peer, pr, map);
1774 XSEGLOG2(&lc, E, "Map delete for map %s returned error", map->volume);
1777 } else if (r == MF_PENDING) {
1778 XSEGLOG2(&lc, I, "Map %s delete pending", map->volume);
1779 __xq_append_tail(&map->pending, (xqindex) pr);
1780 mio->state = DELETING;
1784 XSEGLOG2(&lc, E, "Destroy unreachable");
1789 static int handle_dropcache(struct peerd *peer, struct peer_req *pr,
1790 struct xseg_request *req)
1792 struct mapperd *mapper = __get_mapperd(peer);
1793 struct mapper_io *mio = __get_mapper_io(pr);
1796 char *target = xseg_get_target(peer->xseg, pr->req);
1802 struct map *map = find_map(mapper, target, pr->req->targetlen);
1806 } else if (map->flags & MF_MAP_DESTROYED) {
1809 } else if (map->flags & MF_MAP_NOT_READY && mio->state != DROPPING_CACHE) {
1810 __xq_append_tail(&map->pending, (xqindex) pr);
1814 if (mio->state != DROPPING_CACHE) {
1815 /* block all future operations on the map */
1816 map->flags |= MF_MAP_DROPPING_CACHE;
1818 mio->state = DROPPING_CACHE;
1819 XSEGLOG2(&lc, I, "Map %s start dropping cache", map->volume);
1821 XSEGLOG2(&lc, I, "Map %s continue dropping cache", map->volume);
1824 struct map_node *mn;
1826 for (i = mio->dcobj; i < calc_map_obj(map); i++) {
1827 mn = find_object(map, i);
1831 if (xq_count(&mn->pending) != 0){
1832 XSEGLOG2(&lc, D, "Map %s pending dropping cache for obj idx: %llu",
1833 map->volume, (unsigned long long) mn->objectidx);
1834 __xq_append_tail(&mn->pending, (xqindex) pr);
1837 xq_free(&mn->pending);
1838 XSEGLOG2(&lc, D, "Map %s dropped cache for obj idx: %llu",
1839 map->volume, (unsigned long long) mn->objectidx);
1841 remove_map(mapper, map);
1843 uint64_t qsize = xq_count(&map->pending);
1844 while(qsize > 0 && (i = __xq_pop_head(&map->pending)) != Noneidx){
1846 struct peer_req * preq = (struct peer_req *) i;
1847 my_dispatch(peer, preq, preq->req);
1849 XSEGLOG2(&lc, I, "Map %s droped cache", map->volume);
1851 //free map resources;
1852 mn = find_object(map, 0);
1855 xq_free(&map->pending);
1863 static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1865 struct mapperd *mapper = __get_mapperd(peer);
1867 struct mapper_io *mio = __get_mapper_io(pr);
1870 if (req->op == X_READ) {
1871 /* catch map reads requests here */
1872 handle_mapread(peer, pr, req);
1876 switch (pr->req->op) {
1877 /* primary xseg operations of mapper */
1878 case X_CLONE: handle_clone(peer, pr, req); break;
1879 case X_MAPR: handle_mapr(peer, pr, req); break;
1880 case X_MAPW: handle_mapw(peer, pr, req); break;
1881 // case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1882 case X_INFO: handle_info(peer, pr, req); break;
1883 case X_DELETE: handle_destroy(peer, pr, req); break;
1884 case X_CLOSE: handle_dropcache(peer, pr, req); break;
1885 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1890 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1892 struct mapperd *mapper = __get_mapperd(peer);
1894 struct mapper_io *mio = __get_mapper_io(pr);
1898 mio->state = ACCEPTED;
1899 my_dispatch(peer, pr ,req);
1903 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1906 unsigned char buf[SHA256_DIGEST_SIZE];
1909 gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1911 /* Version check should be the very first call because it
1912 makes sure that important subsystems are intialized. */
1913 gcry_check_version (NULL);
1915 /* Disable secure memory. */
1916 gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1918 /* Tell Libgcrypt that initialization has completed. */
1919 gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1921 /* calculate out magic sha hash value */
1922 gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1924 /* calculate zero block */
1925 //FIXME check hash value
1926 zero = malloc(block_size);
1927 memset(zero, 0, block_size);
1928 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1929 for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1930 sprintf(zero_block + 2*i, "%02x", buf[i]);
1931 printf("%s \n", zero_block);
1934 //FIXME error checks
1935 struct mapperd *mapper = malloc(sizeof(struct mapperd));
1936 mapper->hashmaps = xhash_new(3, STRING);
1937 peer->priv = mapper;
1939 for (i = 0; i < peer->nr_ops; i++) {
1940 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1941 mio->copyups_nodes = xhash_new(3, INTEGER);
1944 peer->peer_reqs[i].priv = mio;
1947 for (i = 0; i < argc; i++) {
1948 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1949 mapper->bportno = atoi(argv[i+1]);
1953 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1954 mapper->mbportno = atoi(argv[i+1]);
1958 /* enforce only one thread */
1959 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1960 int t = atoi(argv[i+1]);
1962 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1970 const struct sched_param param = { .sched_priority = 99 };
1971 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
1979 void print_obj(struct map_node *mn)
1981 fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n",
1982 (unsigned long long) mn->objectidx, mn->object,
1983 (unsigned int) mn->objectlen,
1984 (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
1987 void print_map(struct map *m)
1989 uint64_t nr_objs = m->size/block_size;
1990 if (m->size % block_size)
1992 fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n",
1993 m->volume, m->volumelen,
1994 (unsigned long long) m->size,
1995 (unsigned long long) nr_objs);
1997 struct map_node *mn;
1998 if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2000 for (i = 0; i < nr_objs; i++) {
2001 mn = find_object(m, i);
2003 printf("object idx [%llu] not found!\n", (unsigned long long) i);
2011 void test_map(struct peerd *peer)
2014 //struct sha256_ctx sha256ctx;
2015 unsigned char buf[SHA256_DIGEST_SIZE];
2016 char buf_new[XSEG_MAX_TARGETLEN + 20];
2017 struct map *m = malloc(sizeof(struct map));
2018 strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2019 m->volume[XSEG_MAX_TARGETLEN] = 0;
2020 strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2021 buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2022 m->volumelen = XSEG_MAX_TARGETLEN;
2023 m->size = 100*block_size;
2024 m->objects = xhash_new(3, INTEGER);
2025 struct map_node *map_node = calloc(100, sizeof(struct map_node));
2026 for (i = 0; i < 100; i++) {
2027 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2028 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2030 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2031 sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2033 map_node[i].objectidx = i;
2034 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2035 map_node[i].flags = MF_OBJECT_EXIST;
2036 ret = insert_object(m, &map_node[i]);
2039 char *data = malloc(block_size);
2040 mapheader_to_map(m, data);
2041 uint64_t pos = mapheader_size;
2043 for (i = 0; i < 100; i++) {
2044 map_node = find_object(m, i);
2046 printf("no object node %d \n", i);
2049 object_to_map(data+pos, map_node);
2050 pos += objectsize_in_map;
2054 struct map *m2 = malloc(sizeof(struct map));
2055 strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2056 m->volume[XSEG_MAX_TARGETLEN] = 0;
2057 m->volumelen = XSEG_MAX_TARGETLEN;
2059 m2->objects = xhash_new(3, INTEGER);
2060 ret = read_map(peer, m2, data);
2063 int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2065 while (sum < block_size) {
2066 r = write(fd, data + sum, block_size -sum);
2069 printf("write error\n");
2075 map_node = find_object(m, 0);