8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
16 GCRY_THREAD_OPTION_PTHREAD_IMPL;
20 #define SHA256_DIGEST_SIZE 32
21 /* hex representation of sha256 value takes up double the sha256 size */
22 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
24 #define block_size (1<<22) //FIXME this should be defined here?
25 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
26 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value + volume size */
28 #define MF_OBJECT_EXIST (1 << 0)
29 #define MF_OBJECT_COPYING (1 << 1)
30 #define MF_OBJECT_WRITING (1 << 2)
31 #define MF_OBJECT_DELETING (1 << 3)
33 #define MF_OBJECT_NOT_READY (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
34 extern struct log_ctx lc;
36 char *magic_string = "This a magic string. Please hash me";
37 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
38 char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
40 //internal mapper states
53 char object[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
54 struct xq pending; /* pending peer_reqs on this object */
58 #define MF_MAP_LOADING (1 << 0)
59 #define MF_MAP_DESTROYED (1 << 1)
60 #define MF_MAP_WRITING (1 << 2)
61 #define MF_MAP_DELETING (1 << 3)
62 #define MF_MAP_DROPPING_CACHE (1 << 4)
64 #define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
65 MF_MAP_DROPPING_CACHE)
71 char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
72 xhash_t *objects; /* obj_index --> map_node */
73 struct xq pending; /* pending peer_reqs on this map */
77 xport bportno; /* blocker that accesses data */
78 xport mbportno; /* blocker that accesses maps */
79 xhash_t *hashmaps; // hash_function(target) --> struct map
83 volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */
84 xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
85 struct map_node *copyup_node;
86 int err; /* error flag */
89 enum mapper_state state;
92 static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req);
93 void print_map(struct map *m);
99 static inline struct mapperd * __get_mapperd(struct peerd *peer)
101 return (struct mapperd *) peer->priv;
104 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
106 return (struct mapper_io *) pr->priv;
109 static inline uint64_t calc_map_obj(struct map *map)
111 uint64_t nr_objs = map->size / block_size;
112 if (map->size % block_size)
117 static uint32_t calc_nr_obj(struct xseg_request *req)
120 uint64_t rem_size = req->size;
121 uint64_t obj_offset = req->offset & (block_size -1); //modulo
122 uint64_t obj_size = (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
123 rem_size -= obj_size;
124 while (rem_size > 0) {
125 obj_size = (rem_size > block_size) ? block_size : rem_size;
126 rem_size -= obj_size;
134 * Maps handling functions
137 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
140 struct map *m = NULL;
141 char buf[XSEG_MAX_TARGETLEN+1];
142 //assert targetlen <= XSEG_MAX_TARGETLEN
143 strncpy(buf, target, targetlen);
145 XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
146 r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
153 static int insert_map(struct mapperd *mapper, struct map *map)
157 if (find_map(mapper, map->volume, map->volumelen)){
158 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
162 XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)",
163 map->volume, strlen(map->volume), (unsigned long) map);
164 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
165 while (r == -XHASH_ERESIZE) {
166 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
167 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
169 XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
170 (unsigned long long) shift);
173 mapper->hashmaps = new_hashmap;
174 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
180 static int remove_map(struct mapperd *mapper, struct map *map)
184 //assert no pending pr on map
186 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
187 while (r == -XHASH_ERESIZE) {
188 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
189 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
191 XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
192 (unsigned long long) shift);
195 mapper->hashmaps = new_hashmap;
196 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
203 static int load_map(struct peerd *peer, struct peer_req *pr, char *target,
208 struct xseg_request *req;
209 struct mapperd *mapper = __get_mapperd(peer);
212 struct map *m = find_map(mapper, target, targetlen);
214 m = malloc(sizeof(struct map));
216 XSEGLOG2(&lc, E, "Cannot allocate map ");
220 strncpy(m->volume, target, targetlen);
221 m->volume[targetlen] = 0;
222 m->volumelen = targetlen;
223 m->flags = MF_MAP_LOADING;
224 xqindex *qidx = xq_alloc_empty(&m->pending, peer->nr_ops);
226 XSEGLOG2(&lc, E, "Cannot allocate pending queue for map %s",
230 m->objects = xhash_new(3, INTEGER);
232 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
236 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME err check
241 r = insert_map(mapper, m);
246 req = xseg_get_request(peer->xseg, peer->portno, mapper->mbportno, X_ALLOC);
248 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
253 r = xseg_prep_request(peer->xseg, req, targetlen, block_size);
255 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
260 char *reqtarget = xseg_get_target(peer->xseg, req);
263 strncpy(reqtarget, target, req->targetlen);
265 req->size = block_size;
267 r = xseg_set_req_data(peer->xseg, req, pr);
269 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
273 p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
275 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
279 r = xseg_signal(peer->xseg, p);
281 XSEGLOG2(&lc, I, "Map %s loading", m->volume);
285 xseg_get_req_data(peer->xseg, req, &dummy);
287 xseg_put_request(peer->xseg, req, peer->portno);
290 remove_map(mapper, m);
292 while((idx = __xq_pop_head(&m->pending)) != Noneidx) {
293 fail(peer, (struct peer_req *) idx);
297 xhash_free(m->objects);
299 xq_free(&m->pending);
301 XSEGLOG2(&lc, E, "failed to load map %s", m->volume);
307 //assert map loading when this is reached
308 if (m->flags & MF_MAP_LOADING) {
309 XSEGLOG2(&lc, I, "Map %s already exists and loading. "
310 "Adding to pending queue", m->volume);
311 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME errcheck
314 XSEGLOG2(&lc, I, "Map %s already exists and loaded. Dispatching.", m->volume);
315 my_dispatch(peer, pr, pr->req);
321 static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
322 char *target, uint32_t targetlen, struct map **m)
324 struct mapperd *mapper = __get_mapperd(peer);
326 *m = find_map(mapper, target, targetlen);
328 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
329 if ((*m)->flags & MF_MAP_NOT_READY) {
330 __xq_append_tail(&(*m)->pending, (xqindex) pr);
331 XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
333 //} else if ((*m)->flags & MF_MAP_DESTROYED){
337 XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
341 r = load_map(peer, pr, target, targetlen);
348 * Object handling functions
351 struct map_node *find_object(struct map *map, uint64_t obj_index)
354 int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
360 static int insert_object(struct map *map, struct map_node *mn)
362 //FIXME no find object first
363 int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
364 if (r == -XHASH_ERESIZE) {
365 unsigned long shift = xhash_grow_size_shift(map->objects);
366 map->objects = xhash_resize(map->objects, shift, NULL);
369 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
376 * map read/write functions
378 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
381 //hexlify sha256 value
382 for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
383 sprintf(mn->object+2*i, "%02x", buf[i]);
386 mn->object[SHA256_DIGEST_SIZE * 2] = 0;
387 mn->objectlen = SHA256_DIGEST_SIZE * 2;
388 mn->flags = MF_OBJECT_EXIST;
391 static inline void map_to_object(struct map_node *mn, char *buf)
396 mn->flags |= MF_OBJECT_EXIST;
397 memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
398 mn->object[XSEG_MAX_TARGETLEN] = 0;
399 mn->objectlen = strlen(mn->object);
402 static inline void object_to_map(char* buf, struct map_node *mn)
404 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
405 memcpy(buf+1, mn->object, mn->objectlen);
406 memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
409 static inline void mapheader_to_map(struct map *m, char *buf)
412 memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
413 pos += SHA256_DIGEST_SIZE;
414 memcpy(buf + pos, &m->size, sizeof(m->size));
415 pos += sizeof(m->size);
419 static int object_write(struct peerd *peer, struct peer_req *pr,
420 struct map *map, struct map_node *mn)
423 struct mapperd *mapper = __get_mapperd(peer);
424 struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
425 mapper->mbportno, X_ALLOC);
427 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
429 mn->object, map->volume, (unsigned long long) mn->objectidx);
432 int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
434 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
436 mn->object, map->volume, (unsigned long long) mn->objectidx);
439 char *target = xseg_get_target(peer->xseg, req);
440 strncpy(target, map->volume, req->targetlen);
441 req->size = objectsize_in_map;
442 req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
444 char *data = xseg_get_data(peer->xseg, req);
445 object_to_map(data, mn);
447 r = xseg_set_req_data(peer->xseg, req, pr);
449 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
451 mn->object, map->volume, (unsigned long long) mn->objectidx);
454 xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
456 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
458 mn->object, map->volume, (unsigned long long) mn->objectidx);
461 r = xseg_signal(peer->xseg, p);
463 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
465 XSEGLOG2(&lc, I, "Writing object %s \n\t"
467 mn->object, map->volume, (unsigned long long) mn->objectidx);
472 xseg_get_req_data(peer->xseg, req, &dummy);
474 xseg_put_request(peer->xseg, req, peer->portno);
476 XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
478 mn->object, map->volume, (unsigned long long) mn->objectidx);
482 static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
485 struct mapperd *mapper = __get_mapperd(peer);
487 uint64_t i, pos, max_objidx = calc_map_obj(map);
488 struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
489 mapper->mbportno, X_ALLOC);
491 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
494 int r = xseg_prep_request(peer->xseg, req, map->volumelen,
495 mapheader_size + max_objidx * objectsize_in_map);
497 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
500 char *target = xseg_get_target(peer->xseg, req);
501 strncpy(target, map->volume, req->targetlen);
502 char *data = xseg_get_data(peer->xseg, req);
503 mapheader_to_map(map, data);
504 pos = mapheader_size;
506 req->size = req->datalen;
509 if (map->size % block_size)
511 for (i = 0; i < max_objidx; i++) {
512 mn = find_object(map, i);
514 XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
515 (unsigned long long) i, map->volume);
518 object_to_map(data+pos, mn);
519 pos += objectsize_in_map;
521 r = xseg_set_req_data(peer->xseg, req, pr);
523 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
527 xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
529 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
533 r = xseg_signal(peer->xseg, p);
535 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
537 map->flags |= MF_MAP_WRITING;
538 XSEGLOG2(&lc, I, "Writing map %s", map->volume);
542 xseg_get_req_data(peer->xseg, req, &dummy);
544 xseg_put_request(peer->xseg, req, peer->portno);
546 XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
550 static int read_map (struct peerd *peer, struct map *map, char *buf)
552 char nulls[SHA256_DIGEST_SIZE];
553 memset(nulls, 0, SHA256_DIGEST_SIZE);
555 int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
560 //type 1, our type, type 0 pithos map
561 int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
562 XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
565 struct map_node *map_node;
567 pos = SHA256_DIGEST_SIZE;
568 map->size = *(uint64_t *) (buf + pos);
569 pos += sizeof(uint64_t);
570 nr_objs = map->size / block_size;
571 if (map->size % block_size)
573 map_node = calloc(nr_objs, sizeof(struct map_node));
577 for (i = 0; i < nr_objs; i++) {
578 map_node[i].map = map;
579 map_node[i].objectidx = i;
580 xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
582 map_to_object(&map_node[i], buf + pos);
583 pos += objectsize_in_map;
584 r = insert_object(map, &map_node[i]); //FIXME error check
588 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
589 map_node = calloc(max_nr_objs, sizeof(struct map_node));
592 for (i = 0; i < max_nr_objs; i++) {
593 if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
595 map_node[i].objectidx = i;
596 map_node[i].map = map;
597 xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
599 pithosmap_to_object(&map_node[i], buf + pos);
600 pos += SHA256_DIGEST_SIZE;
601 r = insert_object(map, &map_node[i]); //FIXME error check
603 map->size = i * block_size;
605 XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
608 //FIXME cleanup on error
615 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
620 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
621 if (r == -XHASH_ERESIZE) {
622 xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
623 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
626 mio->copyups_nodes = new_hashmap;
627 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
631 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
632 if (r == -XHASH_ERESIZE) {
633 xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
634 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
637 mio->copyups_nodes = new_hashmap;
638 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
643 mio->copyup_node = mn;
647 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
651 int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
656 return mio->copyup_node;
659 static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
661 struct mapperd *mapper = __get_mapperd(peer);
662 struct mapper_io *mio = __get_mapper_io(pr);
663 struct map *map = mn->map;
668 //struct sha256_ctx sha256ctx;
669 uint32_t newtargetlen;
670 char new_target[XSEG_MAX_TARGETLEN + 1];
671 unsigned char buf[SHA256_DIGEST_SIZE]; //assert sha256_digest_size(32) <= MAXTARGETLEN
672 char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
673 strncpy(new_object, map->volume, map->volumelen);
674 sprintf(new_object + map->volumelen, "%u", mn->objectidx); //sprintf adds null termination
675 new_object[XSEG_MAX_TARGETLEN + 19] = 0;
677 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
678 for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
679 sprintf (new_target + 2*i, "%02x", buf[i]);
680 newtargetlen = SHA256_DIGEST_SIZE * 2;
682 if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE))
683 goto copyup_zeroblock;
685 struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
686 mapper->bportno, X_ALLOC);
689 r = xseg_prep_request(peer->xseg, req, newtargetlen,
690 sizeof(struct xseg_request_copy));
694 char *target = xseg_get_target(peer->xseg, req);
695 strncpy(target, new_target, req->targetlen);
697 struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
698 strncpy(xcopy->target, mn->object, mn->objectlen);
699 xcopy->targetlen = mn->objectlen;
702 req->size = block_size;
704 r = xseg_set_req_data(peer->xseg, req, pr);
707 r = __set_copyup_node(mio, req, mn);
708 p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
712 xseg_signal(peer->xseg, p);
715 mn->flags |= MF_OBJECT_COPYING;
716 XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
720 r = __set_copyup_node(mio, req, NULL);
721 xseg_get_req_data(peer->xseg, req, &dummy);
723 xseg_put_request(peer->xseg, req, peer->portno);
725 XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
729 XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
730 "Proceeding in writing the new object in map");
731 /* construct a tmp map_node for writing purposes */
732 struct map_node newmn = *mn;
733 newmn.flags = MF_OBJECT_EXIST;
734 strncpy(newmn.object, new_target, newtargetlen);
735 newmn.object[newtargetlen] = 0;
736 newmn.objectlen = newtargetlen;
737 newmn.objectidx = mn->objectidx;
738 r = __set_copyup_node(mio, req, mn);
739 r = object_write(peer, pr, map, &newmn);
740 if (r != MF_PENDING){
741 XSEGLOG2(&lc, E, "Object write returned error for object %s"
742 "\n\t of map %s [%llu]",
743 mn->object, map->volume, (unsigned long long) mn->objectidx);
746 mn->flags |= MF_OBJECT_WRITING;
747 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
752 * request handling functions
755 static int handle_mapread(struct peerd *peer, struct peer_req *pr,
756 struct xseg_request *req)
760 char buf[XSEG_MAX_TARGETLEN];
761 struct mapperd *mapper = __get_mapperd(peer);
762 //assert req->op = X_READ;
763 char *target = xseg_get_target(peer->xseg, req);
764 struct map *map = find_map(mapper, target, req->targetlen);
767 //assert map->flags & MF_MAP_LOADING
769 if (req->state & XS_FAILED)
772 char *data = xseg_get_data(peer->xseg, req);
773 r = read_map(peer, map, data);
777 xseg_put_request(peer->xseg, req, peer->portno);
778 map->flags &= ~MF_MAP_LOADING;
779 XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
780 uint64_t qsize = xq_count(&map->pending);
781 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
783 struct peer_req *preq = (struct peer_req *) idx;
784 my_dispatch(peer, preq, preq->req);
789 XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
790 xseg_put_request(peer->xseg, req, peer->portno);
791 map->flags &= ~MF_MAP_LOADING;
792 while((idx = __xq_pop_head(&map->pending)) != Noneidx){
793 struct peer_req *preq = (struct peer_req *) idx;
796 remove_map(mapper, map);
797 //FIXME not freeing up all objects + object hash
802 strncpy(buf, target, req->targetlen);
803 buf[req->targetlen] = 0;
804 XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
805 xseg_put_request(peer->xseg, req, peer->portno);
809 static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
810 struct xseg_request *req)
813 char buf[XSEG_MAX_TARGETLEN];
814 struct mapperd *mapper = __get_mapperd(peer);
815 //assert req->op = X_WRITE;
816 char *target = xseg_get_target(peer->xseg, req);
817 struct map *map = find_map(mapper, target, req->targetlen);
819 fprintf(stderr, "couldn't find map\n");
822 //assert map->flags & MF_MAP_WRITING
824 if (req->state & XS_FAILED){
825 fprintf(stderr, "write request failed\n");
829 xseg_put_request(peer->xseg, req, peer->portno);
830 map->flags &= ~MF_MAP_WRITING;
831 XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
832 uint64_t qsize = xq_count(&map->pending);
833 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
835 struct peer_req *preq = (struct peer_req *) idx;
836 my_dispatch(peer, preq, preq->req);
842 XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
843 xseg_put_request(peer->xseg, req, peer->portno);
844 map->flags &= ~MF_MAP_WRITING;
845 while((idx = __xq_pop_head(&map->pending)) != Noneidx){
846 struct peer_req *preq = (struct peer_req *) idx;
849 remove_map(mapper, map);
850 //FIXME not freeing up all objects + object hash
855 strncpy(buf, target, req->targetlen);
856 buf[req->targetlen] = 0;
857 XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
858 xseg_put_request(peer->xseg, req, peer->portno);
862 static int handle_clone(struct peerd *peer, struct peer_req *pr,
863 struct xseg_request *req)
865 struct mapperd *mapper = __get_mapperd(peer);
866 struct mapper_io *mio = __get_mapper_io(pr);
869 char buf[XSEG_MAX_TARGETLEN + 1];
872 if (pr->req->op != X_CLONE) {
874 XSEGLOG2(&lc, E, "Unknown op %u", req->op);
879 if (req->op == X_WRITE){
880 //assert state = WRITING;
881 r = handle_mapwrite(peer, pr ,req);
883 XSEGLOG2(&lc, E, "handle mapwrite returned error");
889 if (mio->state == WRITING) {
890 target = xseg_get_target(peer->xseg, pr->req);
891 strncpy(buf, target, req->targetlen);
892 buf[req->targetlen] = 0;
893 XSEGLOG2(&lc, I, "Completing clone request for map %s", buf);
898 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
903 r = find_or_load_map(peer, pr, xclone->target, xclone->targetlen, &map);
907 else if (r == MF_PENDING)
910 if (map->flags & MF_MAP_DESTROYED) {
911 strncpy(buf, xclone->target, xclone->targetlen);
912 buf[xclone->targetlen] = 0;
913 XSEGLOG2(&lc, W, "Map %s destroyed", buf);
914 target = xseg_get_target(peer->xseg, pr->req);
915 strncpy(buf, target, req->targetlen);
916 buf[req->targetlen] = 0;
917 XSEGLOG2(&lc, W, "Cannont clone %s because base map destroyed", buf);
922 struct map *clonemap = malloc(sizeof(struct map));
927 FIXME check if clone map exists
928 find_or_load_map(peer, pr, target, req->targetlen, &clonemap)
929 ... (on destroyed what ??
931 target = xseg_get_target(peer->xseg, pr->req);
932 strncpy(buf, target, req->targetlen);
933 buf[req->targetlen] = 0;
934 XSEGLOG2(&lc, W, "Map %s requested for clone exists", buf);
939 //alloc and init struct map
940 clonemap->objects = xhash_new(3, INTEGER);
941 if (!clonemap->objects){
942 goto out_err_clonemap;
944 xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops);
946 goto out_err_objhash;
948 if (xclone->size < map->size) {
949 target = xseg_get_target(peer->xseg, pr->req);
950 strncpy(buf, target, req->targetlen);
951 buf[req->targetlen] = 0;
952 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
953 "\n\t for requested clone %s",
954 (unsigned long long) xclone->size,
955 (unsigned long long) map->size, buf);
958 if (xclone->size == -1)
959 clonemap->size = map->size;
961 clonemap->size = xclone->size;
963 target = xseg_get_target(peer->xseg, pr->req);
964 strncpy(clonemap->volume, target, pr->req->targetlen);
965 clonemap->volumelen = pr->req->targetlen;
966 clonemap->volume[clonemap->volumelen] = 0; //NULL TERMINATE
968 //alloc and init map_nodes
969 unsigned long c = clonemap->size/block_size + 1;
970 struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
975 for (i = 0; i < clonemap->size/block_size + 1; i++) {
976 struct map_node *mn = find_object(map, i);
978 strncpy(map_nodes[i].object, mn->object, mn->objectlen);
979 map_nodes[i].objectlen = mn->objectlen;
981 strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2 ?
982 map_nodes[i].objectlen = strlen(zero_block);
984 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
985 map_nodes[i].flags = 0;
986 map_nodes[i].objectidx = i;
987 map_nodes[i].map = clonemap;
988 xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops);
989 r = insert_object(clonemap, &map_nodes[i]);
995 r = insert_map(mapper, clonemap);
997 XSEGLOG2(&lc, E, "Cannot insert map %s", clonemap->volume);
1000 r = map_write(peer, pr, clonemap);
1002 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1005 else if (r == MF_PENDING) {
1006 //maybe move this to map_write
1007 XSEGLOG2(&lc, I, "Writing map %s", clonemap->volume);
1008 __xq_append_tail(&clonemap->pending, (xqindex) pr);
1009 mio->state = WRITING;
1013 XSEGLOG2(&lc, I, "Map write for map %s returned unknown value", clonemap->volume);
1020 remove_map(mapper, clonemap);
1022 //FIXME not freeing allocated queues of map_nodes
1025 xq_free(&clonemap->pending);
1027 xhash_free(clonemap->objects);
1031 target = xseg_get_target(peer->xseg, pr->req);
1032 strncpy(buf, target, req->targetlen);
1033 buf[req->targetlen] = 0;
1034 XSEGLOG2(&lc, E, "Clone map for %s failed", buf);
1039 static int req2objs(struct peerd *peer, struct peer_req *pr,
1040 struct map *map, int write)
1042 char *target = xseg_get_target(peer->xseg, pr->req);
1043 uint32_t nr_objs = calc_nr_obj(pr->req);
1044 uint64_t size = sizeof(struct xseg_reply_map) +
1045 nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1047 XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1048 /* resize request to fit reply */
1049 char buf[XSEG_MAX_TARGETLEN];
1050 strncpy(buf, target, pr->req->targetlen);
1051 int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1053 XSEGLOG2(&lc, E, "Cannot resize request");
1056 target = xseg_get_target(peer->xseg, pr->req);
1057 strncpy(target, buf, pr->req->targetlen);
1059 /* structure reply */
1060 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1061 reply->cnt = nr_objs;
1064 uint64_t rem_size = pr->req->size;
1065 uint64_t obj_index = pr->req->offset / block_size;
1066 uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo
1067 uint64_t obj_size = (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1068 struct map_node * mn = find_object(map, obj_index);
1070 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1073 if (write && (mn->flags & MF_OBJECT_NOT_READY))
1074 goto out_object_copying;
1075 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1076 //calc new_target, copy up object
1077 r = copyup_object(peer, mn, pr);
1079 XSEGLOG2(&lc, E, "Error in copy up object");
1082 goto out_object_copying;
1085 // XSEGLOG2(&lc, D, "pr->req->offset: %llu, pr->req->size %llu, block_size %u\n",
1086 // (unsigned long long) pr->req->offset,
1087 // (unsigned long long) pr->req->size,
1089 strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1090 reply->segs[idx].targetlen = mn->objectlen;
1091 reply->segs[idx].offset = obj_offset;
1092 reply->segs[idx].size = obj_size;
1093 // XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
1094 // (unsigned long long) reply->segs[idx].size,
1095 // (unsigned long long) reply->segs[idx].offset);
1096 rem_size -= obj_size;
1097 while (rem_size > 0) {
1101 obj_size = (rem_size > block_size) ? block_size : rem_size;
1102 rem_size -= obj_size;
1103 mn = find_object(map, obj_index);
1105 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1108 if (write && (mn->flags & MF_OBJECT_NOT_READY))
1109 goto out_object_copying;
1110 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1111 //calc new_target, copy up object
1112 r = copyup_object(peer, mn, pr);
1114 XSEGLOG2(&lc, E, "Error in copy up object");
1117 goto out_object_copying;
1119 strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1120 reply->segs[idx].targetlen = mn->objectlen;
1121 reply->segs[idx].offset = obj_offset;
1122 reply->segs[idx].size = obj_size;
1123 // XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
1124 // (unsigned long long) reply->segs[idx].size,
1125 // (unsigned long long) reply->segs[idx].offset);
1127 if (reply->cnt != (idx + 1)){
1128 XSEGLOG2(&lc, E, "reply->cnt %u, idx+1: %u", reply->cnt, idx+1);
1135 //printf("r2o mn: %lx\n", mn);
1136 //printf("volume %s pending on %s\n", map->volume, mn->object);
1138 if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx)
1139 XSEGLOG2(&lc, E, "Cannot append pr to tail");
1140 XSEGLOG2(&lc, I, "object %s is pending \n\t idx:%llu of map %s",
1141 mn->object, (unsigned long long) mn->objectidx, map->volume);
1149 static int handle_mapr(struct peerd *peer, struct peer_req *pr,
1150 struct xseg_request *req)
1152 struct mapperd *mapper = __get_mapperd(peer);
1153 struct mapper_io *mio = __get_mapper_io(pr);
1157 char *target = xseg_get_target(peer->xseg, pr->req);
1159 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1164 else if (r == MF_PENDING)
1167 if (map->flags & MF_MAP_DESTROYED) {
1173 r = req2objs(peer, pr, map, 0);
1175 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1177 (unsigned long long) pr->req->offset,
1178 (unsigned long long) (pr->req->offset + pr->req->size));
1182 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1184 (unsigned long long) pr->req->offset,
1185 (unsigned long long) (pr->req->offset + pr->req->size));
1186 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1187 (unsigned long long) req->offset,
1188 (unsigned long long) req->size);
1189 char buf[XSEG_MAX_TARGETLEN+1];
1190 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1192 for (i = 0; i < reply->cnt; i++) {
1193 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1194 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1195 buf[reply->segs[i].targetlen] = 0;
1196 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1197 (unsigned long long) reply->segs[i].offset,
1198 (unsigned long long) reply->segs[i].size);
1207 static int handle_copyup(struct peerd *peer, struct peer_req *pr,
1208 struct xseg_request *req)
1210 struct mapperd *mapper = __get_mapperd(peer);
1212 struct mapper_io *mio = __get_mapper_io(pr);
1215 struct map_node *mn = __get_copyup_node(mio, req);
1219 mn->flags &= ~MF_OBJECT_COPYING;
1220 if (req->state & XS_FAILED && !(req->state & XS_SERVED)){
1221 XSEGLOG2(&lc, E, "Copy up of object %s failed", mn->object);
1224 struct map *map = mn->map;
1226 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1230 /* construct a tmp map_node for writing purposes */
1231 char *target = xseg_get_target(peer->xseg, req);
1232 struct map_node newmn = *mn;
1233 newmn.flags = MF_OBJECT_EXIST;
1234 strncpy(newmn.object, target, req->targetlen);
1235 newmn.object[req->targetlen] = 0;
1236 newmn.objectlen = req->targetlen;
1237 newmn.objectidx = mn->objectidx;
1238 r = object_write(peer, pr, map, &newmn);
1239 if (r != MF_PENDING){
1240 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1241 "\n\t of map %s [%llu]",
1242 mn->object, map->volume, (unsigned long long) mn->objectidx);
1245 mn->flags |= MF_OBJECT_WRITING;
1246 xseg_put_request(peer->xseg, req, peer->portno);
1247 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1251 xseg_put_request(peer->xseg, req, peer->portno);
1252 __set_copyup_node(mio, req, NULL);
1253 while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1254 struct peer_req * preq = (struct peer_req *) idx;
1260 XSEGLOG2(&lc, E, "Cannot get map node");
1264 static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
1265 struct xseg_request *req)
1268 struct mapperd *mapper = __get_mapperd(peer);
1269 struct mapper_io *mio = __get_mapper_io(pr);
1270 //assert req->op = X_WRITE;
1271 char *target = xseg_get_target(peer->xseg, req);
1274 //printf("handle object write replyi\n");
1275 struct map_node *mn = __get_copyup_node(mio, req);
1279 __set_copyup_node(mio, req, NULL);
1281 //assert mn->flags & MF_OBJECT_WRITING
1282 mn->flags &= ~MF_OBJECT_WRITING;
1283 if (req->state & XS_FAILED)
1286 struct map_node tmp;
1287 char *data = xseg_get_data(peer->xseg, req);
1288 map_to_object(&tmp, data);
1289 mn->flags |= MF_OBJECT_EXIST;
1290 if (mn->flags != MF_OBJECT_EXIST){
1291 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1294 //assert mn->flags & MF_OBJECT_EXIST
1295 strncpy(mn->object, tmp.object, tmp.objectlen);
1296 mn->object[tmp.objectlen] = 0;
1297 mn->objectlen = tmp.objectlen;
1298 xseg_put_request(peer->xseg, req, peer->portno);
1300 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1301 uint64_t qsize = xq_count(&mn->pending);
1302 while(qsize > 0 && (idx = __xq_pop_head(&mn->pending)) != Noneidx){
1304 struct peer_req * preq = (struct peer_req *) idx;
1305 my_dispatch(peer, preq, preq->req);
1310 XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
1311 xseg_put_request(peer->xseg, req, peer->portno);
1312 while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1313 struct peer_req *preq = (struct peer_req *) idx;
1319 XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
1320 xseg_put_request(peer->xseg, req, peer->portno);
1324 static int handle_mapw(struct peerd *peer, struct peer_req *pr,
1325 struct xseg_request *req)
1327 struct mapperd *mapper = __get_mapperd(peer);
1328 struct mapper_io *mio = __get_mapper_io(pr);
1331 /* handle copy up replies separately */
1332 if (req->op == X_COPY){
1333 if (handle_copyup(peer, pr, req) < 0){
1334 XSEGLOG2(&lc, E, "Handle copy up returned error");
1341 else if(req->op == X_WRITE){
1342 /* handle replies of object write operations */
1343 if (handle_objectwrite(peer, pr, req) < 0) {
1344 XSEGLOG2(&lc, E, "Handle object write returned error");
1352 char *target = xseg_get_target(peer->xseg, pr->req);
1354 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1359 else if (r == MF_PENDING)
1362 if (map->flags & MF_MAP_DESTROYED) {
1367 r = req2objs(peer, pr, map, 1);
1369 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1371 (unsigned long long) pr->req->offset,
1372 (unsigned long long) (pr->req->offset + pr->req->size));
1376 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1378 (unsigned long long) pr->req->offset,
1379 (unsigned long long) (pr->req->offset + pr->req->size));
1380 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1381 (unsigned long long) req->offset,
1382 (unsigned long long) req->size);
1383 char buf[XSEG_MAX_TARGETLEN+1];
1384 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1386 for (i = 0; i < reply->cnt; i++) {
1387 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1388 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1389 buf[reply->segs[i].targetlen] = 0;
1390 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1391 (unsigned long long) reply->segs[i].offset,
1392 (unsigned long long) reply->segs[i].size);
1396 //else copyup pending, wait for pr restart
1401 static int handle_snap(struct peerd *peer, struct peer_req *pr,
1402 struct xseg_request *req)
1408 static int handle_info(struct peerd *peer, struct peer_req *pr,
1409 struct xseg_request *req)
1411 struct mapperd *mapper = __get_mapperd(peer);
1412 struct mapper_io *mio = __get_mapper_io(pr);
1415 char *target = xseg_get_target(peer->xseg, pr->req);
1420 //printf("Handle info\n");
1422 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1427 else if (r == MF_PENDING)
1429 if (map->flags & MF_MAP_DESTROYED) {
1434 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1435 xinfo->size = map->size;
1441 static int delete_object(struct peerd *peer, struct peer_req *pr,
1442 struct map_node *mn)
1445 struct mapperd *mapper = __get_mapperd(peer);
1446 struct mapper_io *mio = __get_mapper_io(pr);
1448 mio->delobj = mn->objectidx;
1449 if (xq_count(&mn->pending) != 0) {
1450 __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1451 XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1456 struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
1457 mapper->bportno, X_ALLOC);
1460 int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1463 char *target = xseg_get_target(peer->xseg, req);
1464 strncpy(target, mn->object, req->targetlen);
1466 req->size = req->datalen;
1469 r = xseg_set_req_data(peer->xseg, req, pr);
1472 __set_copyup_node(mio, req, mn);
1473 xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
1476 r = xseg_signal(peer->xseg, p);
1477 mn->flags |= MF_OBJECT_DELETING;
1478 XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1482 xseg_get_req_data(peer->xseg, req, &dummy);
1484 xseg_put_request(peer->xseg, req, peer->portno);
1486 XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1491 * Find next object for deletion. Start searching on idx mio->delobj.
1492 * Skip non existing map_nodes, free_resources and skip non-existing objects
1493 * Wait for all pending operations on the object, before moving forward to the
1496 * Return MF_PENDING if theres is a pending operation on the next object
1497 * or zero if there is no next object
1499 static int delete_next_object(struct peerd *peer, struct peer_req *pr,
1502 struct mapperd *mapper = __get_mapperd(peer);
1503 struct mapper_io *mio = __get_mapper_io(pr);
1504 uint64_t idx = mio->delobj;
1505 struct map_node *mn;
1508 while (idx < calc_map_obj(map)) {
1509 mn = find_object(map, idx);
1515 if (xq_count(&mn->pending) != 0) {
1516 __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1517 XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1521 if (mn->flags & MF_OBJECT_EXIST){
1522 r = delete_object(peer, pr, mn);
1524 /* on error, just log it, release resources and
1525 * proceed to the next object
1527 XSEGLOG2(&lc, E, "Object %s delete object return error"
1528 "\n\t Map: %s [%llu]",
1529 mn->object, mn->map->volume,
1530 (unsigned long long) mn->objectidx);
1531 xq_free(&mn->pending);
1533 else if (r == MF_PENDING){
1537 xq_free(&mn->pending);
1544 static int handle_object_delete(struct peerd *peer, struct peer_req *pr,
1545 struct map_node *mn, int err)
1547 struct mapperd *mapper = __get_mapperd(peer);
1548 struct mapper_io *mio = __get_mapper_io(pr);
1550 struct map *map = mn->map;
1553 //if object deletion failed, map deletion must continue
1554 //and report OK, since map block has been deleted succesfully
1555 //so, no check for err
1557 //assert object flags OK
1558 //free map_node_resources
1559 mn->flags &= ~MF_OBJECT_DELETING;
1560 xq_free(&mn->pending);
1563 r = delete_next_object(peer, pr, map);
1564 if (r != MF_PENDING){
1565 /* if there is no next object to delete, remove the map block
1569 //assert map flags OK
1570 map->flags |= MF_MAP_DESTROYED;
1571 XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1572 //make all pending requests on map to fail
1573 uint64_t qsize = xq_count(&map->pending);
1574 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
1576 struct peer_req * preq = (struct peer_req *) idx;
1577 my_dispatch(peer, preq, preq->req);
1579 //free map resources;
1580 remove_map(mapper, map);
1581 mn = find_object(map, 0);
1583 xq_free(&map->pending);
1586 XSEGLOG2(&lc, I, "Handle object delete OK");
1590 static int delete_map(struct peerd *peer, struct peer_req *pr,
1594 struct mapperd *mapper = __get_mapperd(peer);
1595 struct mapper_io *mio = __get_mapper_io(pr);
1596 struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
1597 mapper->mbportno, X_ALLOC);
1600 int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1603 char *target = xseg_get_target(peer->xseg, req);
1604 strncpy(target, map->volume, req->targetlen);
1606 req->size = req->datalen;
1609 r = xseg_set_req_data(peer->xseg, req, pr);
1612 __set_copyup_node(mio, req, NULL);
1613 xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
1616 r = xseg_signal(peer->xseg, p);
1617 map->flags |= MF_MAP_DELETING;
1618 XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1622 xseg_get_req_data(peer->xseg, req, &dummy);
1624 xseg_put_request(peer->xseg, req, peer->portno);
1626 XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
1630 static int handle_map_delete(struct peerd *peer, struct peer_req *pr,
1631 struct map *map, int err)
1633 struct mapperd *mapper = __get_mapperd(peer);
1634 struct mapper_io *mio = __get_mapper_io(pr);
1638 map->flags &= ~MF_MAP_DELETING;
1640 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1641 //dispatch all pending
1642 while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1643 struct peer_req * preq = (struct peer_req *) idx;
1644 my_dispatch(peer, preq, preq->req);
1647 map->flags |= MF_MAP_DESTROYED;
1648 //delete all objects
1649 XSEGLOG2(&lc, I, "Map %s map block deleted. Deleting objects", map->volume);
1651 r = delete_next_object(peer, pr, map);
1652 if (r != MF_PENDING){
1653 /* if there is no next object to delete, remove the map block
1656 //assert map flags OK
1657 map->flags |= MF_MAP_DESTROYED;
1658 XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1659 //make all pending requests on map to fail
1660 uint64_t qsize = xq_count(&map->pending);
1661 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
1663 struct peer_req * preq = (struct peer_req *) idx;
1664 my_dispatch(peer, preq, preq->req);
1666 //free map resources;
1667 remove_map(mapper, map);
1668 struct map_node *mn = find_object(map, 0);
1671 xq_free(&map->pending);
1678 static int handle_delete(struct peerd *peer, struct peer_req *pr,
1679 struct xseg_request *req)
1681 struct mapperd *mapper = __get_mapperd(peer);
1682 struct mapper_io *mio = __get_mapper_io(pr);
1683 struct map_node *mn;
1686 if (req->state & XS_FAILED && !(req->state &XS_SERVED))
1689 mn = __get_copyup_node(mio, req);
1690 __set_copyup_node(mio, req, NULL);
1691 char *target = xseg_get_target(peer->xseg, req);
1694 map = find_map(mapper, target, req->targetlen);
1696 xseg_put_request(peer->xseg, req, peer->portno);
1699 handle_map_delete(peer, pr, map, err);
1704 xseg_put_request(peer->xseg, req, peer->portno);
1707 handle_object_delete(peer, pr, mn, err);
1709 xseg_put_request(peer->xseg, req, peer->portno);
1713 static int handle_destroy(struct peerd *peer, struct peer_req *pr,
1714 struct xseg_request *req)
1716 struct mapperd *mapper = __get_mapperd(peer);
1717 struct mapper_io *mio = __get_mapper_io(pr);
1720 char buf[XSEG_MAX_TARGETLEN+1];
1721 char *target = xseg_get_target(peer->xseg, pr->req);
1723 strncpy(buf, target, pr->req->targetlen);
1724 buf[req->targetlen] = 0;
1726 XSEGLOG2(&lc, D, "Handle destroy pr: %lx, pr->req: %lx, req: %lx",
1727 (unsigned long) pr, (unsigned long) pr->req,
1728 (unsigned long) req);
1729 XSEGLOG2(&lc, D, "target: %s (%u)", buf, strlen(buf));
1730 if (pr->req != req && req->op == X_DELETE) {
1731 //assert mio->state == DELETING
1732 r = handle_delete(peer, pr, req);
1734 XSEGLOG2(&lc, E, "Handle delete returned error");
1743 r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1748 else if (r == MF_PENDING)
1750 if (map->flags & MF_MAP_DESTROYED) {
1751 if (mio->state == DELETING){
1752 XSEGLOG2(&lc, I, "Map %s destroyed", map->volume);
1756 XSEGLOG2(&lc, I, "Map %s already destroyed", map->volume);
1761 if (mio->state == DELETING) {
1762 //continue deleting map objects;
1763 r = delete_next_object(peer ,pr, map);
1764 if (r != MF_PENDING){
1770 r = delete_map(peer, pr, map);
1772 XSEGLOG2(&lc, E, "Map delete for map %s returned error", map->volume);
1775 } else if (r == MF_PENDING) {
1776 XSEGLOG2(&lc, I, "Map %s delete pending", map->volume);
1777 __xq_append_tail(&map->pending, (xqindex) pr);
1778 mio->state = DELETING;
1782 XSEGLOG2(&lc, E, "Destroy unreachable");
1787 static int handle_dropcache(struct peerd *peer, struct peer_req *pr,
1788 struct xseg_request *req)
1790 struct mapperd *mapper = __get_mapperd(peer);
1791 struct mapper_io *mio = __get_mapper_io(pr);
1794 char *target = xseg_get_target(peer->xseg, pr->req);
1800 struct map *map = find_map(mapper, target, pr->req->targetlen);
1804 } else if (map->flags & MF_MAP_DESTROYED) {
1807 } else if (map->flags & MF_MAP_NOT_READY && mio->state != DROPPING_CACHE) {
1808 __xq_append_tail(&map->pending, (xqindex) pr);
1812 if (mio->state != DROPPING_CACHE) {
1813 /* block all future operations on the map */
1814 map->flags |= MF_MAP_DROPPING_CACHE;
1816 mio->state = DROPPING_CACHE;
1817 XSEGLOG2(&lc, I, "Map %s start dropping cache", map->volume);
1819 XSEGLOG2(&lc, I, "Map %s continue dropping cache", map->volume);
1822 struct map_node *mn;
1824 for (i = mio->dcobj; i < calc_map_obj(map); i++) {
1825 mn = find_object(map, i);
1829 if (xq_count(&mn->pending) != 0){
1830 XSEGLOG2(&lc, D, "Map %s pending dropping cache for obj idx: %llu",
1831 map->volume, (unsigned long long) mn->objectidx);
1832 __xq_append_tail(&mn->pending, (xqindex) pr);
1835 xq_free(&mn->pending);
1836 XSEGLOG2(&lc, D, "Map %s dropped cache for obj idx: %llu",
1837 map->volume, (unsigned long long) mn->objectidx);
1839 remove_map(mapper, map);
1841 uint64_t qsize = xq_count(&map->pending);
1842 while(qsize > 0 && (i = __xq_pop_head(&map->pending)) != Noneidx){
1844 struct peer_req * preq = (struct peer_req *) i;
1845 my_dispatch(peer, preq, preq->req);
1847 XSEGLOG2(&lc, I, "Map %s droped cache", map->volume);
1849 //free map resources;
1850 mn = find_object(map, 0);
1853 xq_free(&map->pending);
1861 static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1863 struct mapperd *mapper = __get_mapperd(peer);
1865 struct mapper_io *mio = __get_mapper_io(pr);
1868 if (req->op == X_READ) {
1869 /* catch map reads requests here */
1870 handle_mapread(peer, pr, req);
1874 switch (pr->req->op) {
1875 /* primary xseg operations of mapper */
1876 case X_CLONE: handle_clone(peer, pr, req); break;
1877 case X_MAPR: handle_mapr(peer, pr, req); break;
1878 case X_MAPW: handle_mapw(peer, pr, req); break;
1879 // case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1880 case X_INFO: handle_info(peer, pr, req); break;
1881 case X_DELETE: handle_destroy(peer, pr, req); break;
1882 case X_CLOSE: handle_dropcache(peer, pr, req); break;
1883 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1888 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1890 struct mapperd *mapper = __get_mapperd(peer);
1892 struct mapper_io *mio = __get_mapper_io(pr);
1896 mio->state = ACCEPTED;
1897 my_dispatch(peer, pr ,req);
1901 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1904 unsigned char buf[SHA256_DIGEST_SIZE];
1907 gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1909 /* Version check should be the very first call because it
1910 makes sure that important subsystems are intialized. */
1911 gcry_check_version (NULL);
1913 /* Disable secure memory. */
1914 gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1916 /* Tell Libgcrypt that initialization has completed. */
1917 gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1919 /* calculate out magic sha hash value */
1920 gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1922 /* calculate zero block */
1923 //FIXME check hash value
1924 zero = malloc(block_size);
1925 memset(zero, 0, block_size);
1926 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1927 for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1928 sprintf(zero_block + 2*i, "%02x", buf[i]);
1929 printf("%s \n", zero_block);
1932 //FIXME error checks
1933 struct mapperd *mapper = malloc(sizeof(struct mapperd));
1934 mapper->hashmaps = xhash_new(3, STRING);
1935 peer->priv = mapper;
1937 for (i = 0; i < peer->nr_ops; i++) {
1938 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1939 mio->copyups_nodes = xhash_new(3, INTEGER);
1942 peer->peer_reqs[i].priv = mio;
1945 for (i = 0; i < argc; i++) {
1946 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1947 mapper->bportno = atoi(argv[i+1]);
1951 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1952 mapper->mbportno = atoi(argv[i+1]);
1956 /* enforce only one thread */
1957 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1958 int t = atoi(argv[i+1]);
1960 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1973 void print_obj(struct map_node *mn)
1975 fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n",
1976 (unsigned long long) mn->objectidx, mn->object,
1977 (unsigned int) mn->objectlen,
1978 (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
1981 void print_map(struct map *m)
1983 uint64_t nr_objs = m->size/block_size;
1984 if (m->size % block_size)
1986 fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n",
1987 m->volume, m->volumelen,
1988 (unsigned long long) m->size,
1989 (unsigned long long) nr_objs);
1991 struct map_node *mn;
1992 if (nr_objs > 1000000) //FIXME to protect against invalid volume size
1994 for (i = 0; i < nr_objs; i++) {
1995 mn = find_object(m, i);
1997 printf("object idx [%llu] not found!\n", (unsigned long long) i);
2005 void test_map(struct peerd *peer)
2008 //struct sha256_ctx sha256ctx;
2009 unsigned char buf[SHA256_DIGEST_SIZE];
2010 char buf_new[XSEG_MAX_TARGETLEN + 20];
2011 struct map *m = malloc(sizeof(struct map));
2012 strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2013 m->volume[XSEG_MAX_TARGETLEN] = 0;
2014 strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2015 buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2016 m->volumelen = XSEG_MAX_TARGETLEN;
2017 m->size = 100*block_size;
2018 m->objects = xhash_new(3, INTEGER);
2019 struct map_node *map_node = calloc(100, sizeof(struct map_node));
2020 for (i = 0; i < 100; i++) {
2021 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2022 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2024 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2025 sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2027 map_node[i].objectidx = i;
2028 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2029 map_node[i].flags = MF_OBJECT_EXIST;
2030 ret = insert_object(m, &map_node[i]);
2033 char *data = malloc(block_size);
2034 mapheader_to_map(m, data);
2035 uint64_t pos = mapheader_size;
2037 for (i = 0; i < 100; i++) {
2038 map_node = find_object(m, i);
2040 printf("no object node %d \n", i);
2043 object_to_map(data+pos, map_node);
2044 pos += objectsize_in_map;
2048 struct map *m2 = malloc(sizeof(struct map));
2049 strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2050 m->volume[XSEG_MAX_TARGETLEN] = 0;
2051 m->volumelen = XSEG_MAX_TARGETLEN;
2053 m2->objects = xhash_new(3, INTEGER);
2054 ret = read_map(peer, m2, data);
2057 int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2059 while (sum < block_size) {
2060 r = write(fd, data + sum, block_size -sum);
2063 printf("write error\n");
2069 map_node = find_object(m, 0);