8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
16 GCRY_THREAD_OPTION_PTHREAD_IMPL;
20 #define SHA256_DIGEST_SIZE 32
21 /* hex representation of sha256 value takes up double the sha256 size */
22 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
24 #define XSEG_MAX_TARGETLEN (SHA256_DIGEST_SIZE << 1)
26 #define block_size (1<<22) //FIXME this should be defined here?
27 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
28 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value + volume size */
30 #define MF_OBJECT_EXIST (1 << 0)
31 #define MF_OBJECT_COPYING (1 << 1)
32 #define MF_OBJECT_WRITING (1 << 2)
33 #define MF_OBJECT_DELETING (1 << 3)
35 #define MF_OBJECT_NOT_READY (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
36 extern struct log_ctx lc;
38 char *magic_string = "This a magic string. Please hash me";
39 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
40 char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
42 //internal mapper states
54 char object[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
55 struct xq pending; /* pending peer_reqs on this object */
59 #define MF_MAP_LOADING (1 << 0)
60 #define MF_MAP_DESTROYED (1 << 1)
61 #define MF_MAP_WRITING (1 << 2)
62 #define MF_MAP_DELETING (1 << 3)
64 #define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING)
70 char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
71 xhash_t *objects; /* obj_index --> map_node */
72 struct xq pending; /* pending peer_reqs on this map */
76 xport bportno; /* blocker that accesses data */
77 xport mbportno; /* blocker that accesses maps */
78 xhash_t *hashmaps; // hash_function(target) --> struct map
82 volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */
83 xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
84 struct map_node *copyup_node;
85 int err; /* error flag */
87 enum mapper_state state;
90 static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req);
91 void print_map(struct map *m);
97 static inline struct mapperd * __get_mapperd(struct peerd *peer)
99 return (struct mapperd *) peer->priv;
102 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
104 return (struct mapper_io *) pr->priv;
107 static inline uint64_t calc_map_obj(struct map *map)
109 uint64_t nr_objs = map->size / block_size;
110 if (map->size % block_size)
115 static uint32_t calc_nr_obj(struct xseg_request *req)
118 uint64_t rem_size = req->size;
119 uint64_t obj_offset = req->offset & (block_size -1); //modulo
120 uint64_t obj_size = (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
121 rem_size -= obj_size;
122 while (rem_size > 0) {
123 obj_size = (rem_size > block_size) ? block_size : rem_size;
124 rem_size -= obj_size;
132 * Maps handling functions
135 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
138 struct map *m = NULL;
139 char buf[XSEG_MAX_TARGETLEN+1];
140 //assert targetlen <= XSEG_MAX_TARGETLEN
141 strncpy(buf, target, targetlen);
143 r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
150 static int insert_map(struct mapperd *mapper, struct map *map)
154 if (find_map(mapper, map->volume, map->volumelen)){
155 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
159 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
160 if (r == -XHASH_ERESIZE) {
161 xhashidx shift = xhash_grow_size_shift(map->objects);
162 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
164 XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
165 (unsigned long long) shift);
168 mapper->hashmaps = new_hashmap;
169 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
175 static int remove_map(struct mapperd *mapper, struct map *map)
179 //assert no pending pr on map
181 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
182 if (r == -XHASH_ERESIZE) {
183 xhashidx shift = xhash_shrink_size_shift(map->objects);
184 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
186 XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
187 (unsigned long long) shift);
190 mapper->hashmaps = new_hashmap;
191 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
198 static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint32_t targetlen)
202 struct xseg_request *req;
203 struct mapperd *mapper = __get_mapperd(peer);
205 //printf("Loading map\n");
207 struct map *m = find_map(mapper, target, targetlen);
209 m = malloc(sizeof(struct map));
211 XSEGLOG2(&lc, E, "Cannot allocate map ");
215 strncpy(m->volume, target, targetlen);
216 m->volume[targetlen] = 0;
217 m->volumelen = targetlen;
218 m->flags = MF_MAP_LOADING;
219 xqindex *qidx = xq_alloc_empty(&m->pending, peer->nr_ops);
221 XSEGLOG2(&lc, E, "Cannot allocate pending queue for map %s",
225 m->objects = xhash_new(3, INTEGER);
227 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
231 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME err check
236 r = insert_map(mapper, m);
240 //printf("Loading map: preparing req\n");
242 req = xseg_get_request(peer->xseg, peer->portno, mapper->mbportno, X_ALLOC);
244 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
249 r = xseg_prep_request(peer->xseg, req, targetlen, block_size);
251 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
256 char *reqtarget = xseg_get_target(peer->xseg, req);
259 strncpy(reqtarget, target, targetlen);
261 req->size = block_size;
263 r = xseg_set_req_data(peer->xseg, req, pr);
265 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
269 p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
271 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
275 r = xseg_signal(peer->xseg, p);
277 XSEGLOG2(&lc, I, "Map %s loading", m->volume);
281 xseg_get_req_data(peer->xseg, req, &dummy);
283 xseg_put_request(peer->xseg, req, peer->portno);
286 remove_map(mapper, m);
288 while((idx = __xq_pop_head(&m->pending)) != Noneidx) {
289 fail(peer, (struct peer_req *) idx);
293 xhash_free(m->objects);
295 xq_free(&m->pending);
297 XSEGLOG2(&lc, E, "failed to load map %s", m->volume);
303 //assert map loading when this is reached
304 if (m->flags & MF_MAP_LOADING) {
305 XSEGLOG2(&lc, I, "Map %s already exists and loading. "
306 "Adding to pending queue", m->volume);
307 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME errcheck
310 XSEGLOG2(&lc, I, "Map %s already exists and loaded. Dispatching.", m->volume);
311 my_dispatch(peer, pr, pr->req);
317 static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
318 char *target, uint32_t targetlen, struct map **m)
320 struct mapperd *mapper = __get_mapperd(peer);
322 *m = find_map(mapper, target, targetlen);
324 if ((*m)->flags & MF_MAP_NOT_READY) {
325 __xq_append_tail(&(*m)->pending, (xqindex) pr);
326 XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
328 //} else if ((*m)->flags & MF_MAP_DESTROYED){
332 XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
336 r = load_map(peer, pr, target, targetlen);
343 * Object handling functions
346 struct map_node *find_object(struct map *map, uint64_t obj_index)
349 int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
355 static int insert_object(struct map *map, struct map_node *mn)
357 //FIXME no find object first
358 int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
359 if (r == -XHASH_ERESIZE) {
360 unsigned long shift = xhash_grow_size_shift(map->objects);
361 map->objects = xhash_resize(map->objects, shift, NULL);
364 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
371 * map read/write functions
373 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
376 //hexlify sha256 value
377 for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
378 sprintf(mn->object+2*i, "%02x", buf[i]);
381 mn->object[SHA256_DIGEST_SIZE * 2] = 0;
382 mn->objectlen = SHA256_DIGEST_SIZE * 2;
383 mn->flags = MF_OBJECT_EXIST;
386 static inline void map_to_object(struct map_node *mn, char *buf)
391 mn->flags |= MF_OBJECT_EXIST;
392 memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
393 mn->object[XSEG_MAX_TARGETLEN] = 0;
394 mn->objectlen = strlen(mn->object);
397 static inline void object_to_map(char* buf, struct map_node *mn)
399 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
400 memcpy(buf+1, mn->object, mn->objectlen);
401 memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
404 static inline void mapheader_to_map(struct map *m, char *buf)
407 memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
408 pos += SHA256_DIGEST_SIZE;
409 memcpy(buf + pos, &m->size, sizeof(m->size));
410 pos += sizeof(m->size);
414 static int object_write(struct peerd *peer, struct peer_req *pr,
415 struct map *map, struct map_node *mn)
418 struct mapperd *mapper = __get_mapperd(peer);
419 struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
420 mapper->mbportno, X_ALLOC);
422 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
424 mn->object, map->volume, (unsigned long long) mn->objectidx);
427 int r = xseg_prep_request(peer->xseg, req, mn->objectlen, objectsize_in_map);
429 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
431 mn->object, map->volume, (unsigned long long) mn->objectidx);
434 char *target = xseg_get_target(peer->xseg, req);
435 strncpy(target, map->volume, map->volumelen);
436 req->size = objectsize_in_map;
437 req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
439 char *data = xseg_get_data(peer->xseg, req);
440 object_to_map(data, mn);
442 r = xseg_set_req_data(peer->xseg, req, pr);
444 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
446 mn->object, map->volume, (unsigned long long) mn->objectidx);
449 xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
451 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
453 mn->object, map->volume, (unsigned long long) mn->objectidx);
456 r = xseg_signal(peer->xseg, p);
458 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
460 XSEGLOG2(&lc, I, "Writing object %s \n\t"
462 mn->object, map->volume, (unsigned long long) mn->objectidx);
467 xseg_get_req_data(peer->xseg, req, &dummy);
469 xseg_put_request(peer->xseg, req, peer->portno);
471 XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
473 mn->object, map->volume, (unsigned long long) mn->objectidx);
477 static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
480 struct mapperd *mapper = __get_mapperd(peer);
482 uint64_t i, pos, max_objidx = calc_map_obj(map);
483 struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
484 mapper->mbportno, X_ALLOC);
486 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
489 int r = xseg_prep_request(peer->xseg, req, map->volumelen,
490 mapheader_size + max_objidx * objectsize_in_map);
492 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
495 char *target = xseg_get_target(peer->xseg, req);
496 strncpy(target, map->volume, req->targetlen);
497 char *data = xseg_get_data(peer->xseg, req);
498 mapheader_to_map(map, data);
499 pos = mapheader_size;
501 req->size = req->datalen;
504 if (map->size % block_size)
506 for (i = 0; i < max_objidx; i++) {
507 mn = find_object(map, i);
509 XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
510 (unsigned long long) i, map->volume);
513 object_to_map(data+pos, mn);
514 pos += objectsize_in_map;
516 r = xseg_set_req_data(peer->xseg, req, pr);
518 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
522 xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
524 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
528 r = xseg_signal(peer->xseg, p);
530 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
532 map->flags |= MF_MAP_WRITING;
533 XSEGLOG2(&lc, I, "Writing map %s", map->volume);
537 xseg_get_req_data(peer->xseg, req, &dummy);
539 xseg_put_request(peer->xseg, req, peer->portno);
541 XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
545 static int read_map (struct peerd *peer, struct map *map, char *buf)
547 char nulls[SHA256_DIGEST_SIZE];
548 memset(nulls, 0, SHA256_DIGEST_SIZE);
550 int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
555 //type 1, our type, type 0 pithos map
556 int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
557 XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
560 struct map_node *map_node;
562 pos = SHA256_DIGEST_SIZE;
563 map->size = *(uint64_t *) (buf + pos);
564 pos += sizeof(uint64_t);
565 nr_objs = map->size / block_size;
566 if (map->size % block_size)
568 map_node = calloc(nr_objs, sizeof(struct map_node));
572 for (i = 0; i < nr_objs; i++) {
573 map_node[i].map = map;
574 map_node[i].objectidx = i;
575 xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
577 map_to_object(&map_node[i], buf + pos);
578 pos += objectsize_in_map;
579 r = insert_object(map, &map_node[i]); //FIXME error check
583 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
584 map_node = calloc(max_nr_objs, sizeof(struct map_node));
587 for (i = 0; i < max_nr_objs; i++) {
588 if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
590 map_node[i].objectidx = i;
591 map_node[i].map = map;
592 xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
594 pithosmap_to_object(&map_node[i], buf + pos);
595 pos += SHA256_DIGEST_SIZE;
596 r = insert_object(map, &map_node[i]); //FIXME error check
598 map->size = i * block_size;
600 XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
603 //FIXME cleanup on error
610 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
615 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
616 if (r == -XHASH_ERESIZE) {
617 xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
618 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
621 mio->copyups_nodes = new_hashmap;
622 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
626 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
627 if (r == -XHASH_ERESIZE) {
628 xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
629 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
632 mio->copyups_nodes = new_hashmap;
633 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
638 mio->copyup_node = mn;
642 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
646 int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
651 return mio->copyup_node;
654 static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
656 struct mapperd *mapper = __get_mapperd(peer);
657 struct mapper_io *mio = __get_mapper_io(pr);
662 //struct sha256_ctx sha256ctx;
663 uint32_t newtargetlen;
664 char new_target[XSEG_MAX_TARGETLEN + 1];
665 unsigned char buf[SHA256_DIGEST_SIZE]; //assert sha256_digest_size(32) <= MAXTARGETLEN
666 char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
667 strncpy(new_object, mn->object, mn->objectlen);
668 sprintf(new_object + mn->objectlen, "%u", mn->objectidx); //sprintf adds null termination
669 new_object[XSEG_MAX_TARGETLEN + 19] = 0;
671 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
672 for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
673 sprintf (new_target + 2*i, "%02x", buf[i]);
674 newtargetlen = SHA256_DIGEST_SIZE * 2;
677 struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
678 mapper->bportno, X_ALLOC);
681 r = xseg_prep_request(peer->xseg, req, newtargetlen,
682 sizeof(struct xseg_request_copy));
686 char *target = xseg_get_target(peer->xseg, req);
687 strncpy(target, new_target, newtargetlen);
689 struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
690 strncpy(xcopy->target, mn->object, mn->objectlen);
691 xcopy->targetlen = mn->objectlen;
694 req->size = block_size;
696 r = xseg_set_req_data(peer->xseg, req, pr);
699 r = __set_copyup_node(mio, req, mn);
700 p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
704 xseg_signal(peer->xseg, p);
707 XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
711 xseg_get_req_data(peer->xseg, req, &dummy);
713 xseg_put_request(peer->xseg, req, peer->portno);
715 XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
721 * request handling functions
724 static int handle_mapread(struct peerd *peer, struct peer_req *pr,
725 struct xseg_request *req)
729 char buf[XSEG_MAX_TARGETLEN];
730 struct mapperd *mapper = __get_mapperd(peer);
731 //assert req->op = X_READ;
732 char *target = xseg_get_target(peer->xseg, req);
733 struct map *map = find_map(mapper, target, req->targetlen);
736 //assert map->flags & MF_MAP_LOADING
738 if (req->state & XS_FAILED)
741 char *data = xseg_get_data(peer->xseg, req);
742 r = read_map(peer, map, data);
746 xseg_put_request(peer->xseg, req, peer->portno);
747 map->flags &= ~MF_MAP_LOADING;
748 XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
749 while((idx = __xq_pop_head(&map->pending)) != Noneidx){
750 struct peer_req *preq = (struct peer_req *) idx;
751 my_dispatch(peer, preq, preq->req);
756 XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
757 xseg_put_request(peer->xseg, req, peer->portno);
758 map->flags &= ~MF_MAP_LOADING;
759 while((idx = __xq_pop_head(&map->pending)) != Noneidx){
760 struct peer_req *preq = (struct peer_req *) idx;
763 remove_map(mapper, map);
764 //FIXME not freeing up all objects + object hash
769 strncpy(buf, target, req->targetlen);
770 buf[req->targetlen] = 0;
771 XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
772 xseg_put_request(peer->xseg, req, peer->portno);
776 static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
777 struct xseg_request *req)
780 char buf[XSEG_MAX_TARGETLEN];
781 struct mapperd *mapper = __get_mapperd(peer);
782 //assert req->op = X_WRITE;
783 char *target = xseg_get_target(peer->xseg, req);
784 struct map *map = find_map(mapper, target, req->targetlen);
786 fprintf(stderr, "couldn't find map\n");
789 //assert map->flags & MF_MAP_WRITING
791 if (req->state & XS_FAILED){
792 fprintf(stderr, "write request failed\n");
796 xseg_put_request(peer->xseg, req, peer->portno);
797 map->flags &= ~MF_MAP_WRITING;
798 XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
799 while((idx = __xq_pop_head(&map->pending)) != Noneidx){
800 struct peer_req *preq = (struct peer_req *) idx;
801 my_dispatch(peer, preq, preq->req);
807 XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
808 xseg_put_request(peer->xseg, req, peer->portno);
809 map->flags &= ~MF_MAP_WRITING;
810 while((idx = __xq_pop_head(&map->pending)) != Noneidx){
811 struct peer_req *preq = (struct peer_req *) idx;
814 remove_map(mapper, map);
815 //FIXME not freeing up all objects + object hash
820 strncpy(buf, target, req->targetlen);
821 buf[req->targetlen] = 0;
822 XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
823 xseg_put_request(peer->xseg, req, peer->portno);
827 static int handle_clone(struct peerd *peer, struct peer_req *pr,
828 struct xseg_request *req)
830 struct mapperd *mapper = __get_mapperd(peer);
831 struct mapper_io *mio = __get_mapper_io(pr);
834 char buf[XSEG_MAX_TARGETLEN + 1];
837 if (pr->req->op != X_CLONE) {
839 XSEGLOG2(&lc, E, "Unknown op %u", req->op);
844 if (req->op == X_WRITE){
845 //assert state = WRITING;
846 r = handle_mapwrite(peer, pr ,req);
848 XSEGLOG2(&lc, E, "handle mapwrite returned error");
854 if (mio->state == WRITING) {
855 target = xseg_get_target(peer->xseg, pr->req);
856 strncpy(buf, target, req->targetlen);
857 buf[req->targetlen] = 0;
858 XSEGLOG2(&lc, I, "Completing clone request for map %s", buf);
863 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
868 r = find_or_load_map(peer, pr, xclone->target, xclone->targetlen, &map);
872 else if (r == MF_PENDING)
875 if (map->flags & MF_MAP_DESTROYED) {
876 strncpy(buf, xclone->target, xclone->targetlen);
877 buf[xclone->targetlen] = 0;
878 XSEGLOG2(&lc, W, "Map %s destroyed", buf);
879 target = xseg_get_target(peer->xseg, pr->req);
880 strncpy(buf, target, req->targetlen);
881 buf[req->targetlen] = 0;
882 XSEGLOG2(&lc, W, "Cannont clone %s because base map destroyed", buf);
887 struct map *clonemap = malloc(sizeof(struct map));
892 FIXME check if clone map exists
893 find_or_load_map(peer, pr, target, req->targetlen, &clonemap)
894 ... (on destroyed what ??
896 target = xseg_get_target(peer->xseg, pr->req);
897 strncpy(buf, target, req->targetlen);
898 buf[req->targetlen] = 0;
899 XSEGLOG2(&lc, W, "Map %s requested for clone exists", buf);
904 //alloc and init struct map
905 clonemap->objects = xhash_new(3, INTEGER);
906 if (!clonemap->objects){
907 goto out_err_clonemap;
909 xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops);
911 goto out_err_objhash;
913 if (xclone->size < map->size) {
914 target = xseg_get_target(peer->xseg, pr->req);
915 strncpy(buf, target, req->targetlen);
916 buf[req->targetlen] = 0;
917 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
918 "\n\t for requested clone %s",
919 (unsigned long long) xclone->size,
920 (unsigned long long) map->size, buf);
923 if (xclone->size == -1)
924 clonemap->size = map->size;
926 clonemap->size = xclone->size;
928 target = xseg_get_target(peer->xseg, pr->req);
929 strncpy(clonemap->volume, target, pr->req->targetlen);
930 clonemap->volumelen = pr->req->targetlen;
931 clonemap->volume[clonemap->volumelen] = 0; //NULL TERMINATE
933 //alloc and init map_nodes
934 unsigned long c = clonemap->size/block_size + 1;
935 struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
940 for (i = 0; i < clonemap->size/block_size + 1; i++) {
941 struct map_node *mn = find_object(map, i);
943 strncpy(map_nodes[i].object, mn->object, mn->objectlen);
944 map_nodes[i].objectlen = mn->objectlen;
946 strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2 ?
947 map_nodes[i].objectlen = strlen(zero_block);
949 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
950 map_nodes[i].flags = 0;
951 map_nodes[i].objectidx = i;
952 map_nodes[i].map = clonemap;
953 xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops);
954 r = insert_object(clonemap, &map_nodes[i]);
960 r = insert_map(mapper, clonemap);
962 XSEGLOG2(&lc, E, "Cannot insert map %s", clonemap->volume);
965 r = map_write(peer, pr, clonemap);
967 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
970 else if (r == MF_PENDING) {
971 //maybe move this to map_write
972 XSEGLOG2(&lc, I, "Writing map %s", clonemap->volume);
973 __xq_append_tail(&clonemap->pending, (xqindex) pr);
974 mio->state = WRITING;
978 XSEGLOG2(&lc, I, "Map write for map %s returned unknown value", clonemap->volume);
985 remove_map(mapper, clonemap);
987 //FIXME not freeing allocated queues of map_nodes
990 xq_free(&clonemap->pending);
992 xhash_free(clonemap->objects);
996 target = xseg_get_target(peer->xseg, pr->req);
997 strncpy(buf, target, req->targetlen);
998 buf[req->targetlen] = 0;
999 XSEGLOG2(&lc, E, "Clone map for %s failed", buf);
1004 static int req2objs(struct peerd *peer, struct peer_req *pr,
1005 struct map *map, int write)
1007 char *target = xseg_get_target(peer->xseg, pr->req);
1008 uint32_t nr_objs = calc_nr_obj(pr->req);
1009 uint64_t size = sizeof(struct xseg_reply_map) +
1010 nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1012 /* resize request to fit reply */
1013 char buf[XSEG_MAX_TARGETLEN];
1014 strncpy(buf, target, pr->req->targetlen);
1015 int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1017 XSEGLOG2(&lc, E, "Cannot resize request");
1020 target = xseg_get_target(peer->xseg, pr->req);
1021 strncpy(target, buf, pr->req->targetlen);
1023 /* structure reply */
1024 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1025 reply->cnt = nr_objs;
1028 uint64_t rem_size = pr->req->size;
1029 uint64_t obj_index = pr->req->offset / block_size;
1030 uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo
1031 uint64_t obj_size = (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1032 struct map_node * mn = find_object(map, obj_index);
1034 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1035 XSEGLOG2(&lc, E, "pr->req->offset: %llu, block_size %u\n",
1036 (unsigned long long) pr->req->offset,
1040 if (write && (mn->flags & MF_OBJECT_NOT_READY))
1041 goto out_object_copying;
1042 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1043 //calc new_target, copy up object
1044 r = copyup_object(peer, mn, pr);
1046 XSEGLOG2(&lc, E, "Error in copy up object");
1049 mn->flags |= MF_OBJECT_COPYING;
1050 goto out_object_copying;
1053 strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1054 reply->segs[idx].targetlen = mn->objectlen;
1055 reply->segs[idx].target[mn->objectlen] = 0;
1056 reply->segs[idx].offset = obj_offset;
1057 reply->segs[idx].size = obj_size;
1058 rem_size -= obj_size;
1059 while (rem_size > 0) {
1063 obj_size = (rem_size > block_size) ? block_size : rem_size;
1064 rem_size -= obj_size;
1065 mn = find_object(map, obj_index);
1067 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1070 if (write && (mn->flags & MF_OBJECT_NOT_READY))
1071 goto out_object_copying;
1072 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1073 //calc new_target, copy up object
1074 r = copyup_object(peer, mn, pr);
1076 XSEGLOG2(&lc, E, "Error in copy up object");
1079 mn->flags |= MF_OBJECT_COPYING;
1080 goto out_object_copying;
1082 strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1083 reply->segs[idx].targetlen = mn->objectlen;
1084 reply->segs[idx].target[mn->objectlen] = 0;
1085 reply->segs[idx].offset = obj_offset;
1086 reply->segs[idx].size = obj_size;
1092 //printf("r2o mn: %lx\n", mn);
1093 //printf("volume %s pending on %s\n", map->volume, mn->object);
1095 if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx)
1096 XSEGLOG2(&lc, E, "Cannot append pr to tail");
1097 XSEGLOG2(&lc, I, "object %s is pending \n\t idx:%llu of map %s",
1098 mn->object, (unsigned long long) mn->objectidx, map->volume);
1106 static int handle_mapr(struct peerd *peer, struct peer_req *pr,
1107 struct xseg_request *req)
1109 struct mapperd *mapper = __get_mapperd(peer);
1110 struct mapper_io *mio = __get_mapper_io(pr);
1114 char *target = xseg_get_target(peer->xseg, pr->req);
1116 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1121 else if (r == MF_PENDING)
1124 if (map->flags & MF_MAP_DESTROYED) {
1130 r = req2objs(peer, pr, map, 0);
1132 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1134 (unsigned long long) pr->req->offset,
1135 (unsigned long long) (pr->req->offset + pr->req->size));
1139 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1141 (unsigned long long) pr->req->offset,
1142 (unsigned long long) (pr->req->offset + pr->req->size));
1150 static int handle_copyup(struct peerd *peer, struct peer_req *pr,
1151 struct xseg_request *req)
1153 struct mapperd *mapper = __get_mapperd(peer);
1155 struct mapper_io *mio = __get_mapper_io(pr);
1158 struct map_node *mn = __get_copyup_node(mio, req);
1162 mn->flags &= ~MF_OBJECT_COPYING;
1163 if (req->state & XS_FAILED && !(req->state & XS_SERVED)){
1164 XSEGLOG2(&lc, E, "Copy up of object %s failed", mn->object);
1167 struct map *map = mn->map;
1169 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1173 /* construct a tmp map_node for writing purposes */
1174 char *target = xseg_get_target(peer->xseg, req);
1175 struct map_node newmn = *mn;
1176 newmn.flags = MF_OBJECT_EXIST;
1177 strncpy(newmn.object, target, req->targetlen);
1178 newmn.object[req->targetlen] = 0;
1179 newmn.objectlen = req->targetlen;
1180 newmn.objectidx = mn->objectidx;
1181 r = object_write(peer, pr, map, &newmn);
1182 if (r != MF_PENDING){
1183 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1184 "\n\t of map %s [%llu]",
1185 mn->object, map->volume, (unsigned long long) mn->objectidx);
1188 mn->flags |= MF_OBJECT_WRITING;
1189 xseg_put_request(peer->xseg, req, peer->portno);
1190 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1194 xseg_put_request(peer->xseg, req, peer->portno);
1195 __set_copyup_node(mio, req, NULL);
1196 while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1197 struct peer_req * preq = (struct peer_req *) idx;
1202 XSEGLOG2(&lc, E, "Cannot get map node");
1206 static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
1207 struct xseg_request *req)
1210 struct mapperd *mapper = __get_mapperd(peer);
1211 struct mapper_io *mio = __get_mapper_io(pr);
1212 //assert req->op = X_WRITE;
1213 char *target = xseg_get_target(peer->xseg, req);
1216 //printf("handle object write replyi\n");
1217 struct map_node *mn = __get_copyup_node(mio, req);
1221 __set_copyup_node(mio, req, NULL);
1223 //assert mn->flags & MF_OBJECT_WRITING
1224 mn->flags &= ~MF_OBJECT_WRITING;
1225 if (req->state & XS_FAILED)
1228 struct map_node tmp;
1229 char *data = xseg_get_data(peer->xseg, req);
1230 map_to_object(&tmp, data);
1231 mn->flags |= MF_OBJECT_EXIST;
1232 if (mn->flags != MF_OBJECT_EXIST){
1233 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1236 //assert mn->flags & MF_OBJECT_EXIST
1237 strncpy(mn->object, tmp.object, tmp.objectlen);
1238 mn->object[tmp.objectlen] = 0;
1239 mn->objectlen = tmp.objectlen;
1240 xseg_put_request(peer->xseg, req, peer->portno);
1242 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1243 while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1244 struct peer_req * preq = (struct peer_req *) idx;
1245 my_dispatch(peer, preq, preq->req);
1250 XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
1251 xseg_put_request(peer->xseg, req, peer->portno);
1252 while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1253 struct peer_req *preq = (struct peer_req *) idx;
1259 XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
1260 xseg_put_request(peer->xseg, req, peer->portno);
1264 static int handle_mapw(struct peerd *peer, struct peer_req *pr,
1265 struct xseg_request *req)
1267 struct mapperd *mapper = __get_mapperd(peer);
1268 struct mapper_io *mio = __get_mapper_io(pr);
1271 /* handle copy up replies separately */
1272 if (req->op == X_COPY){
1273 if (handle_copyup(peer, pr, req) < 0){
1274 XSEGLOG2(&lc, E, "Handle copy up returned error");
1281 else if(req->op == X_WRITE){
1282 /* handle replies of object write operations */
1283 if (handle_objectwrite(peer, pr, req) < 0) {
1284 XSEGLOG2(&lc, E, "Handle object write returned error");
1292 char *target = xseg_get_target(peer->xseg, pr->req);
1294 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1299 else if (r == MF_PENDING)
1302 if (map->flags & MF_MAP_DESTROYED) {
1307 r = req2objs(peer, pr, map, 1);
1309 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1311 (unsigned long long) pr->req->offset,
1312 (unsigned long long) (pr->req->offset + pr->req->size));
1316 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1318 (unsigned long long) pr->req->offset,
1319 (unsigned long long) (pr->req->offset + pr->req->size));
1322 //else copyup pending, wait for pr restart
1327 static int handle_snap(struct peerd *peer, struct peer_req *pr,
1328 struct xseg_request *req)
1334 static int handle_info(struct peerd *peer, struct peer_req *pr,
1335 struct xseg_request *req)
1337 struct mapperd *mapper = __get_mapperd(peer);
1338 struct mapper_io *mio = __get_mapper_io(pr);
1341 char *target = xseg_get_target(peer->xseg, pr->req);
1346 //printf("Handle info\n");
1348 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1353 else if (r == MF_PENDING)
1355 if (map->flags & MF_MAP_DESTROYED) {
1360 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1361 xinfo->size = map->size;
1367 static int delete_object(struct peerd *peer, struct peer_req *pr,
1368 struct map_node *mn)
1371 struct mapperd *mapper = __get_mapperd(peer);
1372 struct mapper_io *mio = __get_mapper_io(pr);
1374 if (xq_count(&mn->pending) != 0) {
1375 mio->delobj = mn->objectidx;
1376 __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1377 XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1382 struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
1383 mapper->bportno, X_ALLOC);
1386 int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1389 char *target = xseg_get_target(peer->xseg, req);
1390 strncpy(target, mn->object, req->targetlen);
1392 req->size = req->datalen;
1395 r = xseg_set_req_data(peer->xseg, req, pr);
1398 __set_copyup_node(mio, req, mn);
1399 xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
1402 r = xseg_signal(peer->xseg, p);
1403 mn->flags |= MF_OBJECT_DELETING;
1404 XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1408 xseg_get_req_data(peer->xseg, req, &dummy);
1410 xseg_put_request(peer->xseg, req, peer->portno);
1412 XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1415 static int handle_object_delete(struct peerd *peer, struct peer_req *pr,
1416 struct map_node *mn, int err)
1418 struct mapperd *mapper = __get_mapperd(peer);
1419 struct mapper_io *mio = __get_mapper_io(pr);
1421 struct map *map = mn->map;
1424 //if object deletion failed, map deletion must continue
1425 //and report OK, since map block has been deleted succesfully
1426 //so, no check for err
1428 //assert object flags OK
1429 //free map_node_resources
1430 map->flags &= ~MF_OBJECT_DELETING;
1431 xq_free(&mn->pending);
1433 idx = mn->objectidx;
1434 //remove_object(map, idx);
1436 mn = find_object(map, idx);
1437 while (!mn && idx < calc_map_obj(map)) {
1439 mn = find_object(map, idx);
1442 //delete next object or complete;
1443 r = delete_object(peer, pr, mn);
1445 XSEGLOG2(&lc, E, "Object %s delete object return error"
1446 "\n\t Map: %s [%llu]",
1447 mn->object, mn->map->volume,
1448 (unsigned long long) mn->objectidx);
1451 XSEGLOG2(&lc, I, "Handle object delete OK");
1454 //assert map flags OK
1455 map->flags &= ~MF_MAP_DELETING;
1456 map->flags |= MF_MAP_DESTROYED;
1457 XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1458 //make all pending requests on map to fail
1459 while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1460 struct peer_req * preq = (struct peer_req *) idx;
1461 my_dispatch(peer, preq, preq->req);
1463 //free map resources;
1464 remove_map(mapper, map);
1465 mn = find_object(map, 0);
1467 xq_free(&map->pending);
1473 static int delete_map(struct peerd *peer, struct peer_req *pr,
1477 struct mapperd *mapper = __get_mapperd(peer);
1478 struct mapper_io *mio = __get_mapper_io(pr);
1479 struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
1480 mapper->mbportno, X_ALLOC);
1483 int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1486 char *target = xseg_get_target(peer->xseg, req);
1487 strncpy(target, map->volume, req->targetlen);
1489 req->size = req->datalen;
1492 r = xseg_set_req_data(peer->xseg, req, pr);
1495 __set_copyup_node(mio, req, NULL);
1496 xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
1499 r = xseg_signal(peer->xseg, p);
1500 map->flags |= MF_MAP_DELETING;
1501 XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1505 xseg_get_req_data(peer->xseg, req, &dummy);
1507 xseg_put_request(peer->xseg, req, peer->portno);
1509 XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
1513 static int handle_map_delete(struct peerd *peer, struct peer_req *pr,
1514 struct map *map, int err)
1516 struct mapperd *mapper = __get_mapperd(peer);
1517 struct mapper_io *mio = __get_mapper_io(pr);
1521 map->flags &= ~MF_MAP_DELETING;
1523 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1524 //dispatch all pending
1525 while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1526 struct peer_req * preq = (struct peer_req *) idx;
1527 my_dispatch(peer, preq, preq->req);
1530 map->flags |= MF_MAP_DESTROYED;
1531 //delete all objects
1532 XSEGLOG2(&lc, E, "Map %s map block deleted. Deleting objects", map->volume);
1533 struct map_node *mn = find_object(map, 0);
1535 XSEGLOG2(&lc, E, "Map %s has no object 0", map->volume);
1536 //this should never happen
1537 //make all pending requests on map to fail
1538 while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1539 struct peer_req * preq = (struct peer_req *) idx;
1540 my_dispatch(peer, preq, preq->req);
1542 //free map resources;
1543 remove_map(mapper, map);
1544 xq_free(&map->pending);
1548 r = delete_object(peer, pr, mn);
1550 XSEGLOG2(&lc, E, "Deleting first object of map %s returned error"
1551 "\n\t Dispatching pending requests",
1553 //dispatch all pending
1554 while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1555 struct peer_req * preq = (struct peer_req *) idx;
1556 my_dispatch(peer, preq, preq->req);
1563 static int handle_delete(struct peerd *peer, struct peer_req *pr,
1564 struct xseg_request *req)
1566 struct mapperd *mapper = __get_mapperd(peer);
1567 struct mapper_io *mio = __get_mapper_io(pr);
1568 struct map_node *mn;
1571 if (req->state & XS_FAILED && !(req->state &XS_SERVED))
1574 mn = __get_copyup_node(mio, req);
1575 __set_copyup_node(mio, req, NULL);
1576 char *target = xseg_get_target(peer->xseg, req);
1579 map = find_map(mapper, target, req->targetlen);
1581 xseg_put_request(peer->xseg, req, peer->portno);
1584 handle_map_delete(peer, pr, map, err);
1589 xseg_put_request(peer->xseg, req, peer->portno);
1592 handle_object_delete(peer, pr, mn, err);
1594 xseg_put_request(peer->xseg, req, peer->portno);
1598 static int handle_destroy(struct peerd *peer, struct peer_req *pr,
1599 struct xseg_request *req)
1601 struct mapperd *mapper = __get_mapperd(peer);
1602 struct mapper_io *mio = __get_mapper_io(pr);
1606 if (pr->req != req && req->op == X_DELETE) {
1607 //assert mio->state == DELETING
1608 r = handle_delete(peer, pr, req);
1610 XSEGLOG2(&lc, E, "Handle delete returned error");
1619 char *target = xseg_get_target(peer->xseg, pr->req);
1620 r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1625 else if (r == MF_PENDING)
1627 if (map->flags & MF_MAP_DESTROYED) {
1628 if (mio->state == DELETING){
1629 XSEGLOG2(&lc, I, "Map %s destroyed", map->volume);
1633 XSEGLOG2(&lc, I, "Map %s already destroyed", map->volume);
1634 fprintf(stderr, "map destroyed\n");
1639 if (mio->state == DELETING) {
1640 //continue deleting map objects;
1641 struct map_node *mn = find_object(map, mio->delobj);
1646 r = delete_object(peer, pr, mn);
1653 r = delete_map(peer, pr, map);
1655 XSEGLOG2(&lc, E, "Map delete for map %s returned error", map->volume);
1658 } else if (r == MF_PENDING) {
1659 XSEGLOG2(&lc, I, "Map %s delete pending", map->volume);
1660 __xq_append_tail(&map->pending, (xqindex) pr);
1661 mio->state = DELETING;
1665 XSEGLOG2(&lc, E, "Destroy unreachable");
1670 static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1672 struct mapperd *mapper = __get_mapperd(peer);
1674 struct mapper_io *mio = __get_mapper_io(pr);
1677 if (req->op == X_READ) {
1678 /* catch map reads requests here */
1679 handle_mapread(peer, pr, req);
1683 switch (pr->req->op) {
1684 /* primary xseg operations of mapper */
1685 case X_CLONE: handle_clone(peer, pr, req); break;
1686 case X_MAPR: handle_mapr(peer, pr, req); break;
1687 case X_MAPW: handle_mapw(peer, pr, req); break;
1688 // case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1689 case X_INFO: handle_info(peer, pr, req); break;
1690 case X_DELETE: handle_destroy(peer, pr, req); break;
1691 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1696 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1698 struct mapperd *mapper = __get_mapperd(peer);
1700 struct mapper_io *mio = __get_mapper_io(pr);
1704 mio->state = ACCEPTED;
1705 my_dispatch(peer, pr ,req);
1709 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1712 unsigned char buf[SHA256_DIGEST_SIZE];
1715 gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1717 /* Version check should be the very first call because it
1718 makes sure that important subsystems are intialized. */
1719 gcry_check_version (NULL);
1721 /* Disable secure memory. */
1722 gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1724 /* Tell Libgcrypt that initialization has completed. */
1725 gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1727 /* calculate out magic sha hash value */
1728 gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1730 /* calculate zero block */
1731 //FIXME check hash value
1732 zero = malloc(block_size);
1733 memset(zero, 0, block_size);
1734 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1735 for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1736 sprintf(zero_block + 2*i, "%02x", buf[i]);
1737 printf("%s \n", zero_block);
1740 //FIXME error checks
1741 struct mapperd *mapper = malloc(sizeof(struct mapperd));
1742 mapper->hashmaps = xhash_new(3, STRING);
1743 peer->priv = mapper;
1745 for (i = 0; i < peer->nr_ops; i++) {
1746 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1747 mio->copyups_nodes = xhash_new(3, INTEGER);
1750 peer->peer_reqs[i].priv = mio;
1753 for (i = 0; i < argc; i++) {
1754 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1755 mapper->bportno = atoi(argv[i+1]);
1759 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1760 mapper->mbportno = atoi(argv[i+1]);
1764 /* enforce only one thread */
1765 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1766 int t = atoi(argv[i+1]);
1768 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1781 void print_obj(struct map_node *mn)
1783 fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n",
1784 (unsigned long long) mn->objectidx, mn->object,
1785 (unsigned int) mn->objectlen,
1786 (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
1789 void print_map(struct map *m)
1791 uint64_t nr_objs = m->size/block_size;
1792 if (m->size % block_size)
1794 fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n",
1795 m->volume, m->volumelen,
1796 (unsigned long long) m->size,
1797 (unsigned long long) nr_objs);
1799 struct map_node *mn;
1800 if (nr_objs > 1000000) //FIXME to protect against invalid volume size
1802 for (i = 0; i < nr_objs; i++) {
1803 mn = find_object(m, i);
1805 printf("object idx [%llu] not found!\n", (unsigned long long) i);
1813 void test_map(struct peerd *peer)
1816 //struct sha256_ctx sha256ctx;
1817 unsigned char buf[SHA256_DIGEST_SIZE];
1818 char buf_new[XSEG_MAX_TARGETLEN + 20];
1819 struct map *m = malloc(sizeof(struct map));
1820 strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
1821 m->volume[XSEG_MAX_TARGETLEN] = 0;
1822 strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
1823 buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
1824 m->volumelen = XSEG_MAX_TARGETLEN;
1825 m->size = 100*block_size;
1826 m->objects = xhash_new(3, INTEGER);
1827 struct map_node *map_node = calloc(100, sizeof(struct map_node));
1828 for (i = 0; i < 100; i++) {
1829 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
1830 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
1832 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
1833 sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
1835 map_node[i].objectidx = i;
1836 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
1837 map_node[i].flags = MF_OBJECT_EXIST;
1838 ret = insert_object(m, &map_node[i]);
1841 char *data = malloc(block_size);
1842 mapheader_to_map(m, data);
1843 uint64_t pos = mapheader_size;
1845 for (i = 0; i < 100; i++) {
1846 map_node = find_object(m, i);
1848 printf("no object node %d \n", i);
1851 object_to_map(data+pos, map_node);
1852 pos += objectsize_in_map;
1856 struct map *m2 = malloc(sizeof(struct map));
1857 strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
1858 m->volume[XSEG_MAX_TARGETLEN] = 0;
1859 m->volumelen = XSEG_MAX_TARGETLEN;
1861 m2->objects = xhash_new(3, INTEGER);
1862 ret = read_map(peer, m2, data);
1865 int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
1867 while (sum < block_size) {
1868 r = write(fd, data + sum, block_size -sum);
1871 printf("write error\n");
1877 map_node = find_object(m, 0);