8 #include <sys/sha256.h>
9 #include <xtypes/xlock.h>
10 #include <xtypes/xhash.h>
11 #include <xseg/protocol.h>
17 /* hex representation of sha256 value takes up double the sha256 size */
18 #define XSEG_MAX_TARGET_LEN (SHA256_DIGEST_SIZE << 1)
20 #define block_size (1<<20)
21 #define objectsize_in_map (1 + XSEG_MAX_TARGET_LEN) /* transparency byte + max object len */
22 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value + volume size */
24 #define MF_OBJECT_EXIST (1 << 0)
25 #define MF_OBJECT_COPYING (1 << 1)
27 char *magic_string = "This a magic string. Please hash me";
28 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
29 char zero_block[SHA256_DIGEST_SIZE * 2 + 1]; /* hexlified sha256 hash value of a block full of zeros */
35 char object[XSEG_MAX_TARGET_LEN + 1]; /* NULL terminated string */
36 struct xq pending; /* pending peer_reqs on this object */
39 #define MF_MAP_LOADING (1 << 0)
40 #define MF_MAP_DESTROYED (1 << 1)
46 char volume[XSEG_MAX_TARGET_LEN + 1]; /* NULL terminated string */
47 xhash_t *objects; /* obj_index --> map_node */
48 struct xq pending; /* pending peer_reqs on this map */
53 xhash_t *hashmaps; // hash_function(target) --> struct map
57 volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */
58 xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
59 int err; /* error flag */
66 static inline struct mapperd * __get_mapperd(struct peerd *peer)
68 return (struct mapperd *) peer->priv;
71 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
73 return (struct mapper_io *) pr->priv;
76 static inline uint64_t calc_map_obj(struct map *map)
78 uint64_t nr_objs = map->size / block_size;
79 if (map->size % block_size)
84 static uint32_t calc_nr_obj(struct xseg_request *req)
87 uint64_t rem_size = req->size;
88 uint64_t obj_offset = req->offset & (block_size -1); //modulo
89 uint64_t obj_size = (rem_size > block_size) ? block_size - obj_offset : rem_size;
91 while (rem_size > 0) {
92 obj_size = (rem_size - block_size > 0) ? block_size : rem_size;
101 * Maps handling functions
104 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
107 struct map *m = NULL;
108 char buf[XSEG_MAX_TARGET_LEN+1];
109 //assert targetlen <= XSEG_MAX_TARGET_LEN
110 strncpy(buf, target, targetlen);
112 r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
119 static int insert_map(struct mapperd *mapper, struct map *map)
123 if (find_map(mapper, map->volume, map->volumelen)){
124 //printf("map found in insert map\n");
128 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
129 if (r == -XHASH_ERESIZE) {
130 xhashidx shift = xhash_grow_size_shift(map->objects);
131 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
134 mapper->hashmaps = new_hashmap;
135 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
141 static int remove_map(struct mapperd *mapper, struct map *map)
145 //assert no pending pr on map
147 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
148 if (r == -XHASH_ERESIZE) {
149 xhashidx shift = xhash_shrink_size_shift(map->objects);
150 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
153 mapper->hashmaps = new_hashmap;
154 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
161 static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint32_t targetlen)
165 struct xseg_request *req;
166 struct mapperd *mapper = __get_mapperd(peer);
168 //printf("Loading map\n");
170 struct map *m = find_map(mapper, target, targetlen);
172 m = malloc(sizeof(struct map));
176 strncpy(m->volume, target, targetlen);
177 m->volume[XSEG_MAX_TARGET_LEN] = 0;
178 m->volumelen = targetlen;
179 m->flags = MF_MAP_LOADING;
180 xqindex *qidx = xq_alloc_empty(&m->pending, peer->nr_ops);
184 m->objects = xhash_new(3, INTEGER); //FIXME err_check;
187 __xq_append_tail(&m->pending, (xqindex) pr);
192 r = insert_map(mapper, m);
196 //printf("Loading map: preparing req\n");
198 req = xseg_get_request(peer->xseg, peer->portno, mapper->bportno, X_ALLOC);
202 r = xseg_prep_request(peer->xseg, req, targetlen, block_size);
206 char *reqtarget = xseg_get_target(peer->xseg, req);
209 strncpy(reqtarget, target, targetlen);
211 req->size = block_size;
213 r = xseg_set_req_data(peer->xseg, req, pr);
216 p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
219 r = xseg_signal(peer->xseg, p);
221 //printf("Loading map: request issued\n");
225 xseg_get_req_data(peer->xseg, req, &dummy);
227 xseg_put_request(peer->xseg, req, peer->portno);
230 remove_map(mapper, m);
232 while((idx = __xq_pop_head(&m->pending)) != Noneidx) {
233 fail(peer, (struct peer_req *) idx);
237 xhash_free(m->objects);
239 xq_free(&m->pending);
246 //assert map loading when this is reached
247 if (m->flags & MF_MAP_LOADING) {
248 __xq_append_tail(&m->pending, (xqindex) pr);
251 dispatch(peer, pr, pr->req);
257 static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
258 char *target, uint32_t targetlen, struct map **m)
260 struct mapperd *mapper = __get_mapperd(peer);
262 //printf("find map or load\n");
263 *m = find_map(mapper, target, targetlen);
265 //printf("map found\n");
266 if ((*m)->flags & MF_MAP_LOADING) {
267 __xq_append_tail(&(*m)->pending, (xqindex) pr);
268 //printf("Map loading\n");
271 //printf("Map returned\n");
275 r = load_map(peer, pr, target, targetlen);
282 * Object handling functions
285 struct map_node *find_object(struct map *map, uint64_t obj_index)
288 int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
294 static int insert_object(struct map *map, struct map_node *mn)
296 //FIXME no find object first
297 int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
298 if (r == -XHASH_ERESIZE) {
299 unsigned long shift = xhash_grow_size_shift(map->objects);
300 map->objects = xhash_resize(map->objects, shift, NULL);
303 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
310 * map read/write functions
312 static inline void pithosmap_to_object(struct map_node *mn, char *buf)
315 //hexlify sha256 value
316 for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
317 sprintf(mn->object, "%02x", buf[i]);
320 mn->object[XSEG_MAX_TARGET_LEN] = 0;
321 mn->objectlen = strlen(mn->object);
325 static inline void map_to_object(struct map_node *mn, char *buf)
330 mn->flags |= MF_OBJECT_EXIST;
331 memcpy(mn->object, buf+1, XSEG_MAX_TARGET_LEN);
332 mn->object[XSEG_MAX_TARGET_LEN] = 0;
333 mn->objectlen = strlen(mn->object);
336 static inline void object_to_map(char* buf, struct map_node *mn)
338 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
339 memcpy(buf+1, mn->object, mn->objectlen);
340 memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGET_LEN - mn->objectlen); //zero out the rest of the buffer
343 static inline void mapheader_to_map(struct map *m, char *buf)
346 memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
347 pos += SHA256_DIGEST_SIZE;
348 memcpy(buf + pos, &m->size, sizeof(m->size));
349 pos += sizeof(m->size);
353 static int object_write(struct peerd *peer, struct peer_req *pr, struct map_node *mn)
356 struct mapperd *mapper = __get_mapperd(peer);
357 struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
358 mapper->bportno, X_ALLOC);
361 int r = xseg_prep_request(peer->xseg, req, mn->objectlen, objectsize_in_map);
364 char *target = xseg_get_target(peer->xseg, req);
365 strncpy(target, mn->object, mn->objectlen);
366 req->size = objectsize_in_map;
367 req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
369 char *data = xseg_get_data(peer->xseg, req);
370 object_to_map(data, mn);
372 r = xseg_set_req_data(peer->xseg, req, pr);
375 xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
378 r = xseg_signal(peer->xseg, p);
383 xseg_get_req_data(peer->xseg, req, &dummy);
385 xseg_put_request(peer->xseg, req, peer->portno);
390 static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
393 struct mapperd *mapper = __get_mapperd(peer);
395 uint64_t i, pos, max_objidx = calc_map_obj(map);
396 struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
397 mapper->bportno, X_ALLOC);
400 int r = xseg_prep_request(peer->xseg, req, map->volumelen,
401 mapheader_size + max_objidx * objectsize_in_map);
404 char *data = xseg_get_data(peer->xseg, req);
405 mapheader_to_map(map, data);
406 pos = mapheader_size;
408 if (map->size % block_size)
410 for (i = 0; i < max_objidx; i++) {
411 mn = find_object(map, i);
414 object_to_map(data+pos, mn);
415 pos += objectsize_in_map;
417 r = xseg_set_req_data(peer->xseg, req, pr);
420 xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
423 r = xseg_signal(peer->xseg, p);
427 xseg_get_req_data(peer->xseg, req, &dummy);
429 xseg_put_request(peer->xseg, req, peer->portno);
434 static int read_map (struct peerd *peer, struct map *map, char *buf)
436 char nulls[SHA256_DIGEST_SIZE];
437 memset(nulls, 0, SHA256_DIGEST_SIZE);
439 int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
444 //type 1, our type, type 0 pithos map
445 int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
448 struct map_node *map_node;
450 pos = SHA256_DIGEST_SIZE;
451 map->size = *(uint64_t *) (buf + pos);
452 pos += sizeof(uint64_t);
453 nr_objs = map->size / block_size;
454 if (map->size % block_size)
456 map_node = calloc(nr_objs, sizeof(struct map_node));
460 for (i = 0; i < nr_objs; i++) {
461 map_node[i].objectidx = i;
462 xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
463 map_to_object(&map_node[i], buf + pos);
464 pos += objectsize_in_map;
465 r = insert_object(map, &map_node[i]); //FIXME error check
469 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
470 map_node = calloc(max_nr_objs, sizeof(struct map_node));
473 for (i = 0; i < max_nr_objs; i++) {
474 if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
476 map_node[i].objectidx = i;
477 xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
478 pithosmap_to_object(&map_node[i], buf + pos);
479 pos += SHA256_DIGEST_SIZE;
480 r = insert_object(map, &map_node[i]); //FIXME error check
482 map->size = i * block_size;
486 //FIXME cleanup on error
493 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
497 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
498 if (r == -XHASH_ERESIZE) {
499 xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
500 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
503 mio->copyups_nodes = new_hashmap;
504 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
508 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
509 if (r == -XHASH_ERESIZE) {
510 xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
511 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
514 mio->copyups_nodes = new_hashmap;
515 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
522 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
525 int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
531 static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
533 struct mapperd *mapper = __get_mapperd(peer);
534 struct mapper_io *mio = __get_mapper_io(pr);
538 struct sha256_ctx sha256ctx;
539 uint32_t newtargetlen;
540 char new_target[XSEG_MAX_TARGET_LEN + 1];
541 unsigned char buf[SHA256_DIGEST_SIZE]; //assert sha256_digest_size(32) <= MAXTARGETLEN
542 char new_object[XSEG_MAX_TARGET_LEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
543 strncpy(new_object, mn->object, mn->objectlen);
544 sprintf(new_object + mn->objectlen, "%u", mn->objectidx); //sprintf adds null termination
545 new_object[XSEG_MAX_TARGET_LEN + 19] = 0;
548 /* calculate new object name */
549 sha256_init_ctx(&sha256ctx);
550 sha256_process_bytes(new_object, strlen(new_object), &sha256ctx);
551 sha256_finish_ctx(&sha256ctx, buf);
552 for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
553 sprintf (new_target + 2*i, "%02x", buf[i]);
554 newtargetlen = SHA256_DIGEST_SIZE * 2;
557 struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
558 mapper->bportno, X_ALLOC);
561 r = xseg_prep_request(peer->xseg, req, newtargetlen,
562 sizeof(struct xseg_request_copy));
566 char *target = xseg_get_target(peer->xseg, req);
567 strncpy(target, new_target, newtargetlen);
569 struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
570 strncpy(xcopy->target, mn->object, mn->objectlen);
571 xcopy->target[mn->objectlen] = 0;
574 req->size = block_size;
576 r = xseg_set_req_data(peer->xseg, req, pr);
579 r = __set_copyup_node(mio, req, mn);
580 p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
585 xseg_signal(peer->xseg, p);
592 xseg_get_req_data(peer->xseg, req, &dummy);
594 xseg_put_request(peer->xseg, req, peer->portno);
600 * request handling functions
603 static int handle_mapread(struct peerd *peer, struct peer_req *pr,
604 struct xseg_request *req)
608 struct mapperd *mapper = __get_mapperd(peer);
609 //assert req->op = X_READ;
610 char *target = xseg_get_target(peer->xseg, req);
611 struct map *map = find_map(mapper, target, req->targetlen);
614 //assert map->flags & MF_MAP_LOADING
616 if (req->state & XS_FAILED)
619 char *data = xseg_get_data(peer->xseg, req);
620 r = read_map(peer, map, data);
624 xseg_put_request(peer->xseg, req, peer->portno);
625 map->flags &= ~MF_MAP_LOADING;
626 while((idx = __xq_pop_head(&map->pending)) != Noneidx){
627 struct peer_req *preq = (struct peer_req *) idx;
628 dispatch(peer, preq, preq->req);
633 xseg_put_request(peer->xseg, req, peer->portno);
634 map->flags &= ~MF_MAP_LOADING;
635 while((idx = __xq_pop_head(&map->pending)) != Noneidx){
636 struct peer_req *preq = (struct peer_req *) idx;
639 remove_map(mapper, map);
644 xseg_put_request(peer->xseg, req, peer->portno);
648 static int handle_clone(struct peerd *peer, struct peer_req *pr,
649 struct xseg_request *req)
651 struct mapperd *mapper = __get_mapperd(peer);
652 struct mapper_io *mio = __get_mapper_io(pr);
654 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
659 int r = find_or_load_map(peer, pr, xclone->target, strlen(xclone->target), &map);
662 else if (r == MF_PENDING)
665 if (map->flags & MF_MAP_DESTROYED) {
670 //alloc and init struct map
671 struct map *clonemap = malloc(sizeof(struct map));
675 clonemap->objects = xhash_new(3, INTEGER);
676 if (!clonemap->objects){
677 goto out_err_clonemap;
679 xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops);
681 goto out_err_objhash;
682 clonemap->size = xclone->size;
684 char *target = xseg_get_target(peer->xseg, pr->req);
685 strncpy(clonemap->volume, target, pr->req->targetlen);
686 clonemap->volumelen = pr->req->targetlen;
687 clonemap->volume[clonemap->volumelen] = 0; //NULL TERMINATE
689 //alloc and init map_nodes
690 unsigned long c = xclone->size/block_size + 1;
691 struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
696 for (i = 0; i < xclone->size/block_size + 1; i++) {
697 struct map_node *mn = find_object(map, i);
699 strncpy(map_nodes[i].object, mn->object, mn->objectlen);
700 map_nodes[i].objectlen = mn->objectlen;
702 strncpy(map_nodes[i].object, zero_block, strlen(zero_block));
703 map_nodes[i].objectlen = strlen(zero_block);
705 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
706 map_nodes[i].flags = 0;
707 map_nodes[i].objectidx = i;
708 xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops);
709 r = insert_object(clonemap, &map_nodes[i]);
714 r = insert_map(mapper, clonemap);
723 //FIXME not freeing allocated queues of map_nodes
726 xq_free(&clonemap->pending);
728 xhash_free(clonemap->objects);
736 static int req2objs(struct peerd *peer, struct peer_req *pr,
737 struct map *map, int write)
739 char *target = xseg_get_target(peer->xseg, pr->req);
740 uint32_t nr_objs = calc_nr_obj(pr->req);
741 uint64_t size = sizeof(struct xseg_reply_map) +
742 nr_objs * sizeof(struct xseg_reply_map_scatterlist);
744 /* resize request to fit reply */
745 char buf[XSEG_MAX_TARGET_LEN];
746 strncpy(buf, target, pr->req->targetlen);
747 int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
749 printf("couldn't resize req\n");
752 target = xseg_get_target(peer->xseg, pr->req);
753 strncpy(target, buf, pr->req->targetlen);
755 /* structure reply */
756 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
757 reply->cnt = nr_objs;
760 uint64_t rem_size = pr->req->size;
761 uint64_t obj_index = pr->req->offset / block_size;
762 uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo
763 uint64_t obj_size = (rem_size > block_size) ? block_size - obj_offset : rem_size;
764 struct map_node * mn = find_object(map, obj_index);
766 printf("coudn't find obj_index\n");
769 if (write && mn->flags & MF_OBJECT_COPYING)
770 goto out_object_copying;
771 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
772 //calc new_target, copy up object
773 r = copyup_object(peer, mn, pr);
775 printf("err_copy\n");
778 mn->flags |= MF_OBJECT_COPYING;
779 goto out_object_copying;
782 strncpy(reply->segs[idx].target, mn->object, XSEG_MAX_TARGET_LEN); // or strlen(mn->target ?);
783 reply->segs[idx].target[mn->objectlen] = 0;
784 reply->segs[idx].offset = obj_offset;
785 reply->segs[idx].size = obj_size;
786 rem_size -= obj_size;
787 while (rem_size > 0) {
791 obj_size = (rem_size - block_size > 0) ? block_size : rem_size;
792 rem_size -= obj_size;
793 mn = find_object(map, obj_index);
795 printf("coudn't find obj_index\n");
798 if (write && mn->flags & MF_OBJECT_COPYING)
799 goto out_object_copying;
800 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
801 //calc new_target, copy up object
802 r = copyup_object(peer, mn, pr);
804 printf("err_copy\n");
807 mn->flags |= MF_OBJECT_COPYING;
808 goto out_object_copying;
810 strncpy(reply->segs[idx].target, mn->object, XSEG_MAX_TARGET_LEN); // or strlen(mn->target ?);
811 reply->segs[idx].target[mn->objectlen] = 0;
812 reply->segs[idx].offset = obj_offset;
813 reply->segs[idx].size = obj_size;
819 //printf("r2o mn: %lx\n", mn);
820 if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx)
821 printf("couldn't append pr to tail\n");
829 static int handle_mapr(struct peerd *peer, struct peer_req *pr,
830 struct xseg_request *req)
832 struct mapperd *mapper = __get_mapperd(peer);
833 struct mapper_io *mio = __get_mapper_io(pr);
836 char *target = xseg_get_target(peer->xseg, pr->req);
838 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
843 else if (r == MF_PENDING)
846 if (map->flags & MF_MAP_DESTROYED) {
852 r = req2objs(peer, pr, map, 0);
864 static int handle_copyup(struct peerd *peer, struct peer_req *pr,
865 struct xseg_request *req)
867 struct mapperd *mapper = __get_mapperd(peer);
869 struct mapper_io *mio = __get_mapper_io(pr);
871 //printf("handle copyup reply\n");
872 if (req->state & XS_FAILED && !(req->state & XS_SERVED)) {
873 //printf("copy up failed\n");
877 struct map_node *mn = __get_copyup_node(mio, req);
879 //printf("copy up mn not found\n");
883 //printf("mn: %lx\n", mn);
884 mn->flags &= ~MF_OBJECT_COPYING;
886 mn->flags |= MF_OBJECT_EXIST;
887 char *target = xseg_get_target(peer->xseg, req);
888 strncpy(mn->object, target, req->targetlen);
891 __set_copyup_node(mio, req, NULL);
892 xseg_put_request(peer->xseg, req, peer->portno);
896 //handle peer_requests waiting on copy up
899 while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
900 //printf("dispatching pending\n");
901 struct peer_req * preq = (struct peer_req *) idx;
902 dispatch(peer, preq, preq->req);
909 static int handle_mapw(struct peerd *peer, struct peer_req *pr,
910 struct xseg_request *req)
912 struct mapperd *mapper = __get_mapperd(peer);
913 struct mapper_io *mio = __get_mapper_io(pr);
915 /* handle copy up replies separately */
916 if (req->op == X_COPY)
917 return handle_copyup(peer, pr, req);
919 char *target = xseg_get_target(peer->xseg, pr->req);
921 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
926 else if (r == MF_PENDING)
929 if (map->flags & MF_MAP_DESTROYED) {
930 printf("map MF_MAP_DESTROYED req %lx\n", pr->req);
935 //printf("mapw failed\n");
939 //printf("handle mapw\n");
942 r = req2objs(peer, pr, map, 1);
944 printf("req2obj returned r < 0 for req %lx\n", pr->req);
949 //else copyup pending, wait for pr restart
954 static int handle_snap(struct peerd *peer, struct peer_req *pr,
955 struct xseg_request *req)
961 static int handle_info(struct peerd *peer, struct peer_req *pr,
962 struct xseg_request *req)
964 struct mapperd *mapper = __get_mapperd(peer);
965 struct mapper_io *mio = __get_mapper_io(pr);
967 char *target = xseg_get_target(peer->xseg, pr->req);
972 //printf("Handle info\n");
974 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
979 else if (r == MF_PENDING)
981 if (map->flags & MF_MAP_DESTROYED) {
986 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
987 xinfo->size = map->size;
993 static int handle_destroy(struct peerd *peer, struct peer_req *pr,
994 struct xseg_request *req)
998 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1003 else if (r == MF_PENDING)
1005 map->flags |= MF_MAP_DESTROYED;
1008 //do not delete all objects
1009 //remove_map(mapper, map);
1010 //free(map, map_nodes, all allocated resources);
1011 //complete(peer, pr);
1016 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1018 struct mapperd *mapper = __get_mapperd(peer);
1020 struct mapper_io *mio = __get_mapper_io(pr);
1023 if (req->op == X_READ) {
1024 /* catch map reads requests here */
1025 handle_mapread(peer, pr, req);
1029 switch (pr->req->op) {
1030 /* primary xseg operations of mapper */
1031 case X_CLONE: handle_clone(peer, pr, req); break;
1032 case X_MAPR: handle_mapr(peer, pr, req); break;
1033 case X_MAPW: handle_mapw(peer, pr, req); break;
1034 // case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1035 case X_INFO: handle_info(peer, pr, req); break;
1036 case X_DELETE: handle_destroy(peer, pr, req); break;
1042 int custom_peer_init(struct peerd *peer, int argc, const char *argv[])
1045 unsigned char buf[SHA256_DIGEST_SIZE];
1047 struct sha256_ctx sha256ctx;
1048 /* calculate out magic sha hash value */
1049 sha256_init_ctx(&sha256ctx);
1050 sha256_process_bytes(magic_string, strlen(magic_string), &sha256ctx);
1051 sha256_finish_ctx(&sha256ctx, magic_sha256);
1053 /* calculate zero block */
1054 //FIXME check hash value
1055 zero = malloc(block_size);
1056 memset(zero, 0, block_size);
1057 sha256_init_ctx(&sha256ctx);
1058 sha256_process_bytes(zero, block_size, &sha256ctx);
1059 sha256_finish_ctx(&sha256ctx, buf);
1060 for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1061 sprintf(zero_block + 2*i, "%02x", buf[i]);
1062 printf("%s \n", zero_block);
1064 //FIXME error checks
1065 struct mapperd *mapper = malloc(sizeof(struct mapperd));
1066 mapper->hashmaps = xhash_new(3, STRING);
1067 peer->priv = mapper;
1069 for (i = 0; i < peer->nr_ops; i++) {
1070 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1071 mio->copyups_nodes = xhash_new(3, INTEGER);
1074 peer->peer_reqs[i].priv = mio;
1077 for (i = 0; i < argc; i++) {
1078 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1079 mapper->bportno = atoi(argv[i+1]);
1083 /* enforce only one thread */
1084 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1085 int t = atoi(argv[i+1]);
1087 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1100 void print_obj(struct map_node *mn)
1102 printf("[%llu]object name: %s[%u] exists: %c\n", mn->objectidx, mn->object, mn->objectlen,
1103 (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
1106 void print_map(struct map *m)
1108 uint64_t nr_objs = m->size/block_size;
1109 if (m->size % block_size)
1111 printf("Volume name: %s[%u], size: %llu, nr_objs: %llu\n",
1112 m->volume, m->volumelen, m->size, nr_objs);
1114 struct map_node *mn;
1115 if (nr_objs > 1000000) //FIXME to protect against invalid volume size
1117 for (i = 0; i < nr_objs; i++) {
1118 mn = find_object(m, i);
1120 printf("object idx [%llu] not found!\n", i);
1127 void test_map(struct peerd *peer)
1130 struct sha256_ctx sha256ctx;
1131 unsigned char buf[SHA256_DIGEST_SIZE];
1132 char buf_new[XSEG_MAX_TARGET_LEN + 20];
1133 struct map *m = malloc(sizeof(struct map));
1134 strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGET_LEN + 1);
1135 m->volume[XSEG_MAX_TARGET_LEN] = 0;
1136 strncpy(buf_new, m->volume, XSEG_MAX_TARGET_LEN);
1137 buf_new[XSEG_MAX_TARGET_LEN + 19] = 0;
1138 m->volumelen = XSEG_MAX_TARGET_LEN;
1139 m->size = 100*block_size;
1140 m->objects = xhash_new(3, INTEGER);
1141 struct map_node *map_node = calloc(100, sizeof(struct map_node));
1142 for (i = 0; i < 100; i++) {
1143 sprintf(buf_new +XSEG_MAX_TARGET_LEN, "%u", i);
1144 sha256_init_ctx(&sha256ctx);
1145 sha256_process_bytes(buf_new, strlen(buf_new), &sha256ctx);
1146 sha256_finish_ctx(&sha256ctx, buf);
1147 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
1148 sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
1150 map_node[i].objectidx = i;
1151 map_node[i].objectlen = XSEG_MAX_TARGET_LEN;
1152 map_node[i].flags = MF_OBJECT_EXIST;
1153 ret = insert_object(m, &map_node[i]);
1156 char *data = malloc(block_size);
1157 mapheader_to_map(m, data);
1158 uint64_t pos = mapheader_size;
1160 for (i = 0; i < 100; i++) {
1161 map_node = find_object(m, i);
1163 printf("no object node %d \n", i);
1166 object_to_map(data+pos, map_node);
1167 pos += objectsize_in_map;
1171 struct map *m2 = malloc(sizeof(struct map));
1172 strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGET_LEN +1);
1173 m->volume[XSEG_MAX_TARGET_LEN] = 0;
1174 m->volumelen = XSEG_MAX_TARGET_LEN;
1176 m2->objects = xhash_new(3, INTEGER);
1177 ret = read_map(peer, m2, data);
1180 int fd = open(m->volume, O_CREAT|O_WRONLY);
1182 while (sum < block_size) {
1183 r = write(fd, data + sum, block_size -sum);
1186 printf("write error\n");
1192 map_node = find_object(m, 0);