8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
16 #include <sys/syscall.h>
18 GCRY_THREAD_OPTION_PTHREAD_IMPL;
22 #define SHA256_DIGEST_SIZE 32
23 /* hex representation of sha256 value takes up double the sha256 size */
24 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
26 #define block_size (1<<22) //FIXME this should be defined here?
27 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
28 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value + volume size */
30 #define MF_OBJECT_EXIST (1 << 0)
31 #define MF_OBJECT_COPYING (1 << 1)
32 #define MF_OBJECT_WRITING (1 << 2)
33 #define MF_OBJECT_DELETING (1 << 3)
35 #define MF_OBJECT_NOT_READY (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
36 extern struct log_ctx lc;
38 char *magic_string = "This a magic string. Please hash me";
39 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
40 char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
42 //internal mapper states
55 char object[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
56 struct xq pending; /* pending peer_reqs on this object */
60 #define MF_MAP_LOADING (1 << 0)
61 #define MF_MAP_DESTROYED (1 << 1)
62 #define MF_MAP_WRITING (1 << 2)
63 #define MF_MAP_DELETING (1 << 3)
64 #define MF_MAP_DROPPING_CACHE (1 << 4)
66 #define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
67 MF_MAP_DROPPING_CACHE)
73 char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
74 xhash_t *objects; /* obj_index --> map_node */
75 struct xq pending; /* pending peer_reqs on this map */
79 xport bportno; /* blocker that accesses data */
80 xport mbportno; /* blocker that accesses maps */
81 xhash_t *hashmaps; // hash_function(target) --> struct map
85 volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */
86 xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
87 struct map_node *copyup_node;
88 int err; /* error flag */
91 enum mapper_state state;
94 void print_map(struct map *m);
100 static inline struct mapperd * __get_mapperd(struct peerd *peer)
102 return (struct mapperd *) peer->priv;
105 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
107 return (struct mapper_io *) pr->priv;
110 static inline uint64_t calc_map_obj(struct map *map)
112 uint64_t nr_objs = map->size / block_size;
113 if (map->size % block_size)
118 static uint32_t calc_nr_obj(struct xseg_request *req)
121 uint64_t rem_size = req->size;
122 uint64_t obj_offset = req->offset & (block_size -1); //modulo
123 uint64_t obj_size = (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
124 rem_size -= obj_size;
125 while (rem_size > 0) {
126 obj_size = (rem_size > block_size) ? block_size : rem_size;
127 rem_size -= obj_size;
135 * Maps handling functions
138 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
141 struct map *m = NULL;
142 char buf[XSEG_MAX_TARGETLEN+1];
143 //assert targetlen <= XSEG_MAX_TARGETLEN
144 strncpy(buf, target, targetlen);
146 XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
147 r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
154 static int insert_map(struct mapperd *mapper, struct map *map)
158 if (find_map(mapper, map->volume, map->volumelen)){
159 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
163 XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)",
164 map->volume, strlen(map->volume), (unsigned long) map);
165 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
166 while (r == -XHASH_ERESIZE) {
167 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
168 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
170 XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
171 (unsigned long long) shift);
174 mapper->hashmaps = new_hashmap;
175 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
181 static int remove_map(struct mapperd *mapper, struct map *map)
185 //assert no pending pr on map
187 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
188 while (r == -XHASH_ERESIZE) {
189 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
190 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
192 XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
193 (unsigned long long) shift);
196 mapper->hashmaps = new_hashmap;
197 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
204 static int load_map(struct peerd *peer, struct peer_req *pr, char *target,
209 struct xseg_request *req;
210 struct mapperd *mapper = __get_mapperd(peer);
213 struct map *m = find_map(mapper, target, targetlen);
215 m = malloc(sizeof(struct map));
217 XSEGLOG2(&lc, E, "Cannot allocate map ");
221 strncpy(m->volume, target, targetlen);
222 m->volume[targetlen] = 0;
223 m->volumelen = targetlen;
224 m->flags = MF_MAP_LOADING;
225 xqindex *qidx = xq_alloc_empty(&m->pending, peer->nr_ops);
227 XSEGLOG2(&lc, E, "Cannot allocate pending queue for map %s",
231 m->objects = xhash_new(3, INTEGER);
233 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
237 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME err check
242 r = insert_map(mapper, m);
247 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
249 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
254 r = xseg_prep_request(peer->xseg, req, targetlen, block_size);
256 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
261 char *reqtarget = xseg_get_target(peer->xseg, req);
264 strncpy(reqtarget, target, req->targetlen);
266 req->size = block_size;
268 r = xseg_set_req_data(peer->xseg, req, pr);
270 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
274 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
276 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
280 r = xseg_signal(peer->xseg, p);
282 XSEGLOG2(&lc, I, "Map %s loading", m->volume);
286 xseg_get_req_data(peer->xseg, req, &dummy);
288 xseg_put_request(peer->xseg, req, pr->portno);
291 remove_map(mapper, m);
293 while((idx = __xq_pop_head(&m->pending)) != Noneidx) {
294 fail(peer, (struct peer_req *) idx);
298 xhash_free(m->objects);
300 xq_free(&m->pending);
302 XSEGLOG2(&lc, E, "failed to load map %s", m->volume);
308 //assert map loading when this is reached
309 if (m->flags & MF_MAP_LOADING) {
310 XSEGLOG2(&lc, I, "Map %s already exists and loading. "
311 "Adding to pending queue", m->volume);
312 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME errcheck
315 XSEGLOG2(&lc, I, "Map %s already exists and loaded. Dispatching.", m->volume);
316 dispatch(peer, pr, pr->req, internal);
322 static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
323 char *target, uint32_t targetlen, struct map **m)
325 struct mapperd *mapper = __get_mapperd(peer);
327 *m = find_map(mapper, target, targetlen);
329 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
330 if ((*m)->flags & MF_MAP_NOT_READY) {
331 __xq_append_tail(&(*m)->pending, (xqindex) pr);
332 XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
334 //} else if ((*m)->flags & MF_MAP_DESTROYED){
338 XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
342 r = load_map(peer, pr, target, targetlen);
349 * Object handling functions
352 struct map_node *find_object(struct map *map, uint64_t obj_index)
355 int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
361 static int insert_object(struct map *map, struct map_node *mn)
363 //FIXME no find object first
364 int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
365 if (r == -XHASH_ERESIZE) {
366 unsigned long shift = xhash_grow_size_shift(map->objects);
367 map->objects = xhash_resize(map->objects, shift, NULL);
370 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
377 * map read/write functions
379 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
382 //hexlify sha256 value
383 for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
384 sprintf(mn->object+2*i, "%02x", buf[i]);
387 mn->object[SHA256_DIGEST_SIZE * 2] = 0;
388 mn->objectlen = SHA256_DIGEST_SIZE * 2;
389 mn->flags = MF_OBJECT_EXIST;
392 static inline void map_to_object(struct map_node *mn, char *buf)
397 mn->flags |= MF_OBJECT_EXIST;
398 memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
399 mn->object[XSEG_MAX_TARGETLEN] = 0;
400 mn->objectlen = strlen(mn->object);
403 static inline void object_to_map(char* buf, struct map_node *mn)
405 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
406 memcpy(buf+1, mn->object, mn->objectlen);
407 memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
410 static inline void mapheader_to_map(struct map *m, char *buf)
413 memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
414 pos += SHA256_DIGEST_SIZE;
415 memcpy(buf + pos, &m->size, sizeof(m->size));
416 pos += sizeof(m->size);
420 static int object_write(struct peerd *peer, struct peer_req *pr,
421 struct map *map, struct map_node *mn)
424 struct mapperd *mapper = __get_mapperd(peer);
425 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
426 mapper->mbportno, X_ALLOC);
428 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
430 mn->object, map->volume, (unsigned long long) mn->objectidx);
433 int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
435 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
437 mn->object, map->volume, (unsigned long long) mn->objectidx);
440 char *target = xseg_get_target(peer->xseg, req);
441 strncpy(target, map->volume, req->targetlen);
442 req->size = objectsize_in_map;
443 req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
445 char *data = xseg_get_data(peer->xseg, req);
446 object_to_map(data, mn);
448 r = xseg_set_req_data(peer->xseg, req, pr);
450 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
452 mn->object, map->volume, (unsigned long long) mn->objectidx);
455 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
457 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
459 mn->object, map->volume, (unsigned long long) mn->objectidx);
462 r = xseg_signal(peer->xseg, p);
464 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
466 XSEGLOG2(&lc, I, "Writing object %s \n\t"
468 mn->object, map->volume, (unsigned long long) mn->objectidx);
473 xseg_get_req_data(peer->xseg, req, &dummy);
475 xseg_put_request(peer->xseg, req, pr->portno);
477 XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
479 mn->object, map->volume, (unsigned long long) mn->objectidx);
483 static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
486 struct mapperd *mapper = __get_mapperd(peer);
488 uint64_t i, pos, max_objidx = calc_map_obj(map);
489 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
490 mapper->mbportno, X_ALLOC);
492 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
495 int r = xseg_prep_request(peer->xseg, req, map->volumelen,
496 mapheader_size + max_objidx * objectsize_in_map);
498 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
501 char *target = xseg_get_target(peer->xseg, req);
502 strncpy(target, map->volume, req->targetlen);
503 char *data = xseg_get_data(peer->xseg, req);
504 mapheader_to_map(map, data);
505 pos = mapheader_size;
507 req->size = req->datalen;
510 if (map->size % block_size)
512 for (i = 0; i < max_objidx; i++) {
513 mn = find_object(map, i);
515 XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
516 (unsigned long long) i, map->volume);
519 object_to_map(data+pos, mn);
520 pos += objectsize_in_map;
522 r = xseg_set_req_data(peer->xseg, req, pr);
524 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
528 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
530 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
534 r = xseg_signal(peer->xseg, p);
536 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
538 map->flags |= MF_MAP_WRITING;
539 XSEGLOG2(&lc, I, "Writing map %s", map->volume);
543 xseg_get_req_data(peer->xseg, req, &dummy);
545 xseg_put_request(peer->xseg, req, pr->portno);
547 XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
551 static int read_map (struct peerd *peer, struct map *map, char *buf)
553 char nulls[SHA256_DIGEST_SIZE];
554 memset(nulls, 0, SHA256_DIGEST_SIZE);
556 int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
561 //type 1, our type, type 0 pithos map
562 int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
563 XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
566 struct map_node *map_node;
568 pos = SHA256_DIGEST_SIZE;
569 map->size = *(uint64_t *) (buf + pos);
570 pos += sizeof(uint64_t);
571 nr_objs = map->size / block_size;
572 if (map->size % block_size)
574 map_node = calloc(nr_objs, sizeof(struct map_node));
578 for (i = 0; i < nr_objs; i++) {
579 map_node[i].map = map;
580 map_node[i].objectidx = i;
581 xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
583 map_to_object(&map_node[i], buf + pos);
584 pos += objectsize_in_map;
585 r = insert_object(map, &map_node[i]); //FIXME error check
589 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
590 map_node = calloc(max_nr_objs, sizeof(struct map_node));
593 for (i = 0; i < max_nr_objs; i++) {
594 if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
596 map_node[i].objectidx = i;
597 map_node[i].map = map;
598 xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
600 pithosmap_to_object(&map_node[i], buf + pos);
601 pos += SHA256_DIGEST_SIZE;
602 r = insert_object(map, &map_node[i]); //FIXME error check
604 map->size = i * block_size;
606 XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
609 //FIXME cleanup on error
616 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
621 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
622 if (r == -XHASH_ERESIZE) {
623 xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
624 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
627 mio->copyups_nodes = new_hashmap;
628 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
632 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
633 if (r == -XHASH_ERESIZE) {
634 xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
635 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
638 mio->copyups_nodes = new_hashmap;
639 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
644 mio->copyup_node = mn;
648 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
652 int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
657 return mio->copyup_node;
660 static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
662 struct mapperd *mapper = __get_mapperd(peer);
663 struct mapper_io *mio = __get_mapper_io(pr);
664 struct map *map = mn->map;
669 //struct sha256_ctx sha256ctx;
670 uint32_t newtargetlen;
671 char new_target[XSEG_MAX_TARGETLEN + 1];
672 unsigned char buf[SHA256_DIGEST_SIZE]; //assert sha256_digest_size(32) <= MAXTARGETLEN
673 char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
674 strncpy(new_object, map->volume, map->volumelen);
675 sprintf(new_object + map->volumelen, "%u", mn->objectidx); //sprintf adds null termination
676 new_object[XSEG_MAX_TARGETLEN + 19] = 0;
678 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
679 for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
680 sprintf (new_target + 2*i, "%02x", buf[i]);
681 newtargetlen = SHA256_DIGEST_SIZE * 2;
683 if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE))
684 goto copyup_zeroblock;
686 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
687 mapper->bportno, X_ALLOC);
690 r = xseg_prep_request(peer->xseg, req, newtargetlen,
691 sizeof(struct xseg_request_copy));
695 char *target = xseg_get_target(peer->xseg, req);
696 strncpy(target, new_target, req->targetlen);
698 struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
699 strncpy(xcopy->target, mn->object, mn->objectlen);
700 xcopy->targetlen = mn->objectlen;
703 req->size = block_size;
705 r = xseg_set_req_data(peer->xseg, req, pr);
708 r = __set_copyup_node(mio, req, mn);
709 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
713 xseg_signal(peer->xseg, p);
716 mn->flags |= MF_OBJECT_COPYING;
717 XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
721 r = __set_copyup_node(mio, req, NULL);
722 xseg_get_req_data(peer->xseg, req, &dummy);
724 xseg_put_request(peer->xseg, req, pr->portno);
726 XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
730 XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
731 "Proceeding in writing the new object in map");
732 /* construct a tmp map_node for writing purposes */
733 struct map_node newmn = *mn;
734 newmn.flags = MF_OBJECT_EXIST;
735 strncpy(newmn.object, new_target, newtargetlen);
736 newmn.object[newtargetlen] = 0;
737 newmn.objectlen = newtargetlen;
738 newmn.objectidx = mn->objectidx;
739 r = __set_copyup_node(mio, req, mn);
740 r = object_write(peer, pr, map, &newmn);
741 if (r != MF_PENDING){
742 XSEGLOG2(&lc, E, "Object write returned error for object %s"
743 "\n\t of map %s [%llu]",
744 mn->object, map->volume, (unsigned long long) mn->objectidx);
747 mn->flags |= MF_OBJECT_WRITING;
748 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
753 * request handling functions
756 static int handle_mapread(struct peerd *peer, struct peer_req *pr,
757 struct xseg_request *req)
761 char buf[XSEG_MAX_TARGETLEN];
762 struct mapperd *mapper = __get_mapperd(peer);
763 //assert req->op = X_READ;
764 char *target = xseg_get_target(peer->xseg, req);
765 struct map *map = find_map(mapper, target, req->targetlen);
768 //assert map->flags & MF_MAP_LOADING
770 if (req->state & XS_FAILED)
773 char *data = xseg_get_data(peer->xseg, req);
774 r = read_map(peer, map, data);
778 xseg_put_request(peer->xseg, req, pr->portno);
779 map->flags &= ~MF_MAP_LOADING;
780 XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
781 uint64_t qsize = xq_count(&map->pending);
782 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
784 struct peer_req *preq = (struct peer_req *) idx;
785 dispatch(peer, preq, preq->req, internal);
790 XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
791 xseg_put_request(peer->xseg, req, pr->portno);
792 map->flags &= ~MF_MAP_LOADING;
793 while((idx = __xq_pop_head(&map->pending)) != Noneidx){
794 struct peer_req *preq = (struct peer_req *) idx;
797 remove_map(mapper, map);
798 //FIXME not freeing up all objects + object hash
803 strncpy(buf, target, req->targetlen);
804 buf[req->targetlen] = 0;
805 XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
806 xseg_put_request(peer->xseg, req, pr->portno);
810 static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
811 struct xseg_request *req)
814 char buf[XSEG_MAX_TARGETLEN];
815 struct mapperd *mapper = __get_mapperd(peer);
816 //assert req->op = X_WRITE;
817 char *target = xseg_get_target(peer->xseg, req);
818 struct map *map = find_map(mapper, target, req->targetlen);
820 fprintf(stderr, "couldn't find map\n");
823 //assert map->flags & MF_MAP_WRITING
825 if (req->state & XS_FAILED){
826 fprintf(stderr, "write request failed\n");
830 xseg_put_request(peer->xseg, req, pr->portno);
831 map->flags &= ~MF_MAP_WRITING;
832 XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
833 uint64_t qsize = xq_count(&map->pending);
834 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
836 struct peer_req *preq = (struct peer_req *) idx;
837 dispatch(peer, preq, preq->req, internal);
843 XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
844 xseg_put_request(peer->xseg, req, pr->portno);
845 map->flags &= ~MF_MAP_WRITING;
846 while((idx = __xq_pop_head(&map->pending)) != Noneidx){
847 struct peer_req *preq = (struct peer_req *) idx;
850 remove_map(mapper, map);
851 //FIXME not freeing up all objects + object hash
856 strncpy(buf, target, req->targetlen);
857 buf[req->targetlen] = 0;
858 XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
859 xseg_put_request(peer->xseg, req, pr->portno);
863 static int handle_clone(struct peerd *peer, struct peer_req *pr,
864 struct xseg_request *req)
866 struct mapperd *mapper = __get_mapperd(peer);
867 struct mapper_io *mio = __get_mapper_io(pr);
870 char buf[XSEG_MAX_TARGETLEN + 1];
873 if (pr->req->op != X_CLONE) {
875 XSEGLOG2(&lc, E, "Unknown op %u", req->op);
880 if (req->op == X_WRITE){
881 //assert state = WRITING;
882 r = handle_mapwrite(peer, pr ,req);
884 XSEGLOG2(&lc, E, "handle mapwrite returned error");
890 if (mio->state == WRITING) {
891 target = xseg_get_target(peer->xseg, pr->req);
892 strncpy(buf, target, req->targetlen);
893 buf[req->targetlen] = 0;
894 XSEGLOG2(&lc, I, "Completing clone request for map %s", buf);
899 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
904 r = find_or_load_map(peer, pr, xclone->target, xclone->targetlen, &map);
908 else if (r == MF_PENDING)
911 if (map->flags & MF_MAP_DESTROYED) {
912 strncpy(buf, xclone->target, xclone->targetlen);
913 buf[xclone->targetlen] = 0;
914 XSEGLOG2(&lc, W, "Map %s destroyed", buf);
915 target = xseg_get_target(peer->xseg, pr->req);
916 strncpy(buf, target, req->targetlen);
917 buf[req->targetlen] = 0;
918 XSEGLOG2(&lc, W, "Cannont clone %s because base map destroyed", buf);
923 struct map *clonemap = malloc(sizeof(struct map));
928 FIXME check if clone map exists
929 find_or_load_map(peer, pr, target, req->targetlen, &clonemap)
930 ... (on destroyed what ??
932 target = xseg_get_target(peer->xseg, pr->req);
933 strncpy(buf, target, req->targetlen);
934 buf[req->targetlen] = 0;
935 XSEGLOG2(&lc, W, "Map %s requested for clone exists", buf);
940 //alloc and init struct map
941 clonemap->objects = xhash_new(3, INTEGER);
942 if (!clonemap->objects){
943 goto out_err_clonemap;
945 xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops);
947 goto out_err_objhash;
949 if (xclone->size < map->size) {
950 target = xseg_get_target(peer->xseg, pr->req);
951 strncpy(buf, target, req->targetlen);
952 buf[req->targetlen] = 0;
953 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
954 "\n\t for requested clone %s",
955 (unsigned long long) xclone->size,
956 (unsigned long long) map->size, buf);
959 if (xclone->size == -1)
960 clonemap->size = map->size;
962 clonemap->size = xclone->size;
964 target = xseg_get_target(peer->xseg, pr->req);
965 strncpy(clonemap->volume, target, pr->req->targetlen);
966 clonemap->volumelen = pr->req->targetlen;
967 clonemap->volume[clonemap->volumelen] = 0; //NULL TERMINATE
969 //alloc and init map_nodes
970 unsigned long c = clonemap->size/block_size + 1;
971 struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
976 for (i = 0; i < clonemap->size/block_size + 1; i++) {
977 struct map_node *mn = find_object(map, i);
979 strncpy(map_nodes[i].object, mn->object, mn->objectlen);
980 map_nodes[i].objectlen = mn->objectlen;
982 strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2 ?
983 map_nodes[i].objectlen = strlen(zero_block);
985 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
986 map_nodes[i].flags = 0;
987 map_nodes[i].objectidx = i;
988 map_nodes[i].map = clonemap;
989 xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops);
990 r = insert_object(clonemap, &map_nodes[i]);
996 r = insert_map(mapper, clonemap);
998 XSEGLOG2(&lc, E, "Cannot insert map %s", clonemap->volume);
1001 r = map_write(peer, pr, clonemap);
1003 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1006 else if (r == MF_PENDING) {
1007 //maybe move this to map_write
1008 XSEGLOG2(&lc, I, "Writing map %s", clonemap->volume);
1009 __xq_append_tail(&clonemap->pending, (xqindex) pr);
1010 mio->state = WRITING;
1014 XSEGLOG2(&lc, I, "Map write for map %s returned unknown value", clonemap->volume);
1021 remove_map(mapper, clonemap);
1023 //FIXME not freeing allocated queues of map_nodes
1026 xq_free(&clonemap->pending);
1028 xhash_free(clonemap->objects);
1032 target = xseg_get_target(peer->xseg, pr->req);
1033 strncpy(buf, target, req->targetlen);
1034 buf[req->targetlen] = 0;
1035 XSEGLOG2(&lc, E, "Clone map for %s failed", buf);
1040 static int req2objs(struct peerd *peer, struct peer_req *pr,
1041 struct map *map, int write)
1043 char *target = xseg_get_target(peer->xseg, pr->req);
1044 uint32_t nr_objs = calc_nr_obj(pr->req);
1045 uint64_t size = sizeof(struct xseg_reply_map) +
1046 nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1048 XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1049 /* resize request to fit reply */
1050 char buf[XSEG_MAX_TARGETLEN];
1051 strncpy(buf, target, pr->req->targetlen);
1052 int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1054 XSEGLOG2(&lc, E, "Cannot resize request");
1057 target = xseg_get_target(peer->xseg, pr->req);
1058 strncpy(target, buf, pr->req->targetlen);
1060 /* structure reply */
1061 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1062 reply->cnt = nr_objs;
1065 uint64_t rem_size = pr->req->size;
1066 uint64_t obj_index = pr->req->offset / block_size;
1067 uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo
1068 uint64_t obj_size = (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1069 struct map_node * mn = find_object(map, obj_index);
1071 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1074 if (write && (mn->flags & MF_OBJECT_NOT_READY))
1075 goto out_object_copying;
1076 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1077 //calc new_target, copy up object
1078 r = copyup_object(peer, mn, pr);
1080 XSEGLOG2(&lc, E, "Error in copy up object");
1083 goto out_object_copying;
1086 // XSEGLOG2(&lc, D, "pr->req->offset: %llu, pr->req->size %llu, block_size %u\n",
1087 // (unsigned long long) pr->req->offset,
1088 // (unsigned long long) pr->req->size,
1090 strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1091 reply->segs[idx].targetlen = mn->objectlen;
1092 reply->segs[idx].offset = obj_offset;
1093 reply->segs[idx].size = obj_size;
1094 // XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
1095 // (unsigned long long) reply->segs[idx].size,
1096 // (unsigned long long) reply->segs[idx].offset);
1097 rem_size -= obj_size;
1098 while (rem_size > 0) {
1102 obj_size = (rem_size > block_size) ? block_size : rem_size;
1103 rem_size -= obj_size;
1104 mn = find_object(map, obj_index);
1106 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1109 if (write && (mn->flags & MF_OBJECT_NOT_READY))
1110 goto out_object_copying;
1111 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1112 //calc new_target, copy up object
1113 r = copyup_object(peer, mn, pr);
1115 XSEGLOG2(&lc, E, "Error in copy up object");
1118 goto out_object_copying;
1120 strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1121 reply->segs[idx].targetlen = mn->objectlen;
1122 reply->segs[idx].offset = obj_offset;
1123 reply->segs[idx].size = obj_size;
1124 // XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
1125 // (unsigned long long) reply->segs[idx].size,
1126 // (unsigned long long) reply->segs[idx].offset);
1128 if (reply->cnt != (idx + 1)){
1129 XSEGLOG2(&lc, E, "reply->cnt %u, idx+1: %u", reply->cnt, idx+1);
1136 //printf("r2o mn: %lx\n", mn);
1137 //printf("volume %s pending on %s\n", map->volume, mn->object);
1139 if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx)
1140 XSEGLOG2(&lc, E, "Cannot append pr to tail");
1141 XSEGLOG2(&lc, I, "object %s is pending \n\t idx:%llu of map %s",
1142 mn->object, (unsigned long long) mn->objectidx, map->volume);
1150 static int handle_mapr(struct peerd *peer, struct peer_req *pr,
1151 struct xseg_request *req)
1153 struct mapperd *mapper = __get_mapperd(peer);
1154 struct mapper_io *mio = __get_mapper_io(pr);
1158 char *target = xseg_get_target(peer->xseg, pr->req);
1160 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1165 else if (r == MF_PENDING)
1168 if (map->flags & MF_MAP_DESTROYED) {
1174 r = req2objs(peer, pr, map, 0);
1176 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1178 (unsigned long long) pr->req->offset,
1179 (unsigned long long) (pr->req->offset + pr->req->size));
1183 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1185 (unsigned long long) pr->req->offset,
1186 (unsigned long long) (pr->req->offset + pr->req->size));
1187 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1188 (unsigned long long) req->offset,
1189 (unsigned long long) req->size);
1190 char buf[XSEG_MAX_TARGETLEN+1];
1191 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1193 for (i = 0; i < reply->cnt; i++) {
1194 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1195 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1196 buf[reply->segs[i].targetlen] = 0;
1197 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1198 (unsigned long long) reply->segs[i].offset,
1199 (unsigned long long) reply->segs[i].size);
1208 static int handle_copyup(struct peerd *peer, struct peer_req *pr,
1209 struct xseg_request *req)
1211 struct mapperd *mapper = __get_mapperd(peer);
1213 struct mapper_io *mio = __get_mapper_io(pr);
1216 struct map_node *mn = __get_copyup_node(mio, req);
1220 mn->flags &= ~MF_OBJECT_COPYING;
1221 if (req->state & XS_FAILED && !(req->state & XS_SERVED)){
1222 XSEGLOG2(&lc, E, "Copy up of object %s failed", mn->object);
1225 struct map *map = mn->map;
1227 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1231 /* construct a tmp map_node for writing purposes */
1232 char *target = xseg_get_target(peer->xseg, req);
1233 struct map_node newmn = *mn;
1234 newmn.flags = MF_OBJECT_EXIST;
1235 strncpy(newmn.object, target, req->targetlen);
1236 newmn.object[req->targetlen] = 0;
1237 newmn.objectlen = req->targetlen;
1238 newmn.objectidx = mn->objectidx;
1239 r = object_write(peer, pr, map, &newmn);
1240 if (r != MF_PENDING){
1241 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1242 "\n\t of map %s [%llu]",
1243 mn->object, map->volume, (unsigned long long) mn->objectidx);
1246 mn->flags |= MF_OBJECT_WRITING;
1247 xseg_put_request(peer->xseg, req, pr->portno);
1248 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1252 xseg_put_request(peer->xseg, req, pr->portno);
1253 __set_copyup_node(mio, req, NULL);
1254 while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1255 struct peer_req * preq = (struct peer_req *) idx;
1261 XSEGLOG2(&lc, E, "Cannot get map node");
1265 static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
1266 struct xseg_request *req)
1269 struct mapperd *mapper = __get_mapperd(peer);
1270 struct mapper_io *mio = __get_mapper_io(pr);
1271 //assert req->op = X_WRITE;
1272 char *target = xseg_get_target(peer->xseg, req);
1275 //printf("handle object write replyi\n");
1276 struct map_node *mn = __get_copyup_node(mio, req);
1280 __set_copyup_node(mio, req, NULL);
1282 //assert mn->flags & MF_OBJECT_WRITING
1283 mn->flags &= ~MF_OBJECT_WRITING;
1284 if (req->state & XS_FAILED)
1287 struct map_node tmp;
1288 char *data = xseg_get_data(peer->xseg, req);
1289 map_to_object(&tmp, data);
1290 mn->flags |= MF_OBJECT_EXIST;
1291 if (mn->flags != MF_OBJECT_EXIST){
1292 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1295 //assert mn->flags & MF_OBJECT_EXIST
1296 strncpy(mn->object, tmp.object, tmp.objectlen);
1297 mn->object[tmp.objectlen] = 0;
1298 mn->objectlen = tmp.objectlen;
1299 xseg_put_request(peer->xseg, req, pr->portno);
1301 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1302 uint64_t qsize = xq_count(&mn->pending);
1303 while(qsize > 0 && (idx = __xq_pop_head(&mn->pending)) != Noneidx){
1305 struct peer_req * preq = (struct peer_req *) idx;
1306 dispatch(peer, preq, preq->req, internal);
1311 XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
1312 xseg_put_request(peer->xseg, req, pr->portno);
1313 while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1314 struct peer_req *preq = (struct peer_req *) idx;
1320 XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
1321 xseg_put_request(peer->xseg, req, pr->portno);
1325 static int handle_mapw(struct peerd *peer, struct peer_req *pr,
1326 struct xseg_request *req)
1328 struct mapperd *mapper = __get_mapperd(peer);
1329 struct mapper_io *mio = __get_mapper_io(pr);
1332 /* handle copy up replies separately */
1333 if (req->op == X_COPY){
1334 if (handle_copyup(peer, pr, req) < 0){
1335 XSEGLOG2(&lc, E, "Handle copy up returned error");
1342 else if(req->op == X_WRITE){
1343 /* handle replies of object write operations */
1344 if (handle_objectwrite(peer, pr, req) < 0) {
1345 XSEGLOG2(&lc, E, "Handle object write returned error");
1353 char *target = xseg_get_target(peer->xseg, pr->req);
1355 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1360 else if (r == MF_PENDING)
1363 if (map->flags & MF_MAP_DESTROYED) {
1368 r = req2objs(peer, pr, map, 1);
1370 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1372 (unsigned long long) pr->req->offset,
1373 (unsigned long long) (pr->req->offset + pr->req->size));
1377 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1379 (unsigned long long) pr->req->offset,
1380 (unsigned long long) (pr->req->offset + pr->req->size));
1381 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1382 (unsigned long long) req->offset,
1383 (unsigned long long) req->size);
1384 char buf[XSEG_MAX_TARGETLEN+1];
1385 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1387 for (i = 0; i < reply->cnt; i++) {
1388 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1389 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1390 buf[reply->segs[i].targetlen] = 0;
1391 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1392 (unsigned long long) reply->segs[i].offset,
1393 (unsigned long long) reply->segs[i].size);
1397 //else copyup pending, wait for pr restart
1402 static int handle_snap(struct peerd *peer, struct peer_req *pr,
1403 struct xseg_request *req)
1409 static int handle_info(struct peerd *peer, struct peer_req *pr,
1410 struct xseg_request *req)
1412 struct mapperd *mapper = __get_mapperd(peer);
1413 struct mapper_io *mio = __get_mapper_io(pr);
1416 char *target = xseg_get_target(peer->xseg, pr->req);
1421 //printf("Handle info\n");
1423 int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1428 else if (r == MF_PENDING)
1430 if (map->flags & MF_MAP_DESTROYED) {
1435 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1436 xinfo->size = map->size;
1442 static int delete_object(struct peerd *peer, struct peer_req *pr,
1443 struct map_node *mn)
1446 struct mapperd *mapper = __get_mapperd(peer);
1447 struct mapper_io *mio = __get_mapper_io(pr);
1449 mio->delobj = mn->objectidx;
1450 if (xq_count(&mn->pending) != 0) {
1451 __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1452 XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1457 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1458 mapper->bportno, X_ALLOC);
1461 int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1464 char *target = xseg_get_target(peer->xseg, req);
1465 strncpy(target, mn->object, req->targetlen);
1467 req->size = req->datalen;
1470 r = xseg_set_req_data(peer->xseg, req, pr);
1473 __set_copyup_node(mio, req, mn);
1474 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1477 r = xseg_signal(peer->xseg, p);
1478 mn->flags |= MF_OBJECT_DELETING;
1479 XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1483 xseg_get_req_data(peer->xseg, req, &dummy);
1485 xseg_put_request(peer->xseg, req, pr->portno);
1487 XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1492 * Find next object for deletion. Start searching on idx mio->delobj.
1493 * Skip non existing map_nodes, free_resources and skip non-existing objects
1494 * Wait for all pending operations on the object, before moving forward to the
1497 * Return MF_PENDING if theres is a pending operation on the next object
1498 * or zero if there is no next object
1500 static int delete_next_object(struct peerd *peer, struct peer_req *pr,
1503 struct mapperd *mapper = __get_mapperd(peer);
1504 struct mapper_io *mio = __get_mapper_io(pr);
1505 uint64_t idx = mio->delobj;
1506 struct map_node *mn;
1509 while (idx < calc_map_obj(map)) {
1510 mn = find_object(map, idx);
1516 if (xq_count(&mn->pending) != 0) {
1517 __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1518 XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1522 if (mn->flags & MF_OBJECT_EXIST){
1523 r = delete_object(peer, pr, mn);
1525 /* on error, just log it, release resources and
1526 * proceed to the next object
1528 XSEGLOG2(&lc, E, "Object %s delete object return error"
1529 "\n\t Map: %s [%llu]",
1530 mn->object, mn->map->volume,
1531 (unsigned long long) mn->objectidx);
1532 xq_free(&mn->pending);
1534 else if (r == MF_PENDING){
1538 xq_free(&mn->pending);
1545 static int handle_object_delete(struct peerd *peer, struct peer_req *pr,
1546 struct map_node *mn, int err)
1548 struct mapperd *mapper = __get_mapperd(peer);
1549 struct mapper_io *mio = __get_mapper_io(pr);
1551 struct map *map = mn->map;
1554 //if object deletion failed, map deletion must continue
1555 //and report OK, since map block has been deleted succesfully
1556 //so, no check for err
1558 //assert object flags OK
1559 //free map_node_resources
1560 mn->flags &= ~MF_OBJECT_DELETING;
1561 xq_free(&mn->pending);
1564 r = delete_next_object(peer, pr, map);
1565 if (r != MF_PENDING){
1566 /* if there is no next object to delete, remove the map block
1570 //assert map flags OK
1571 map->flags |= MF_MAP_DESTROYED;
1572 XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1573 //make all pending requests on map to fail
1574 uint64_t qsize = xq_count(&map->pending);
1575 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
1577 struct peer_req * preq = (struct peer_req *) idx;
1578 dispatch(peer, preq, preq->req, internal);
1580 //free map resources;
1581 remove_map(mapper, map);
1582 mn = find_object(map, 0);
1584 xq_free(&map->pending);
1587 XSEGLOG2(&lc, I, "Handle object delete OK");
1591 static int delete_map(struct peerd *peer, struct peer_req *pr,
1595 struct mapperd *mapper = __get_mapperd(peer);
1596 struct mapper_io *mio = __get_mapper_io(pr);
1597 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1598 mapper->mbportno, X_ALLOC);
1601 int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1604 char *target = xseg_get_target(peer->xseg, req);
1605 strncpy(target, map->volume, req->targetlen);
1607 req->size = req->datalen;
1610 r = xseg_set_req_data(peer->xseg, req, pr);
1613 __set_copyup_node(mio, req, NULL);
1614 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1617 r = xseg_signal(peer->xseg, p);
1618 map->flags |= MF_MAP_DELETING;
1619 XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1623 xseg_get_req_data(peer->xseg, req, &dummy);
1625 xseg_put_request(peer->xseg, req, pr->portno);
1627 XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
1631 static int handle_map_delete(struct peerd *peer, struct peer_req *pr,
1632 struct map *map, int err)
1634 struct mapperd *mapper = __get_mapperd(peer);
1635 struct mapper_io *mio = __get_mapper_io(pr);
1639 map->flags &= ~MF_MAP_DELETING;
1641 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1642 //dispatch all pending
1643 while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1644 struct peer_req * preq = (struct peer_req *) idx;
1645 dispatch(peer, preq, preq->req, internal);
1648 map->flags |= MF_MAP_DESTROYED;
1649 //delete all objects
1650 XSEGLOG2(&lc, I, "Map %s map block deleted. Deleting objects", map->volume);
1652 r = delete_next_object(peer, pr, map);
1653 if (r != MF_PENDING){
1654 /* if there is no next object to delete, remove the map block
1657 //assert map flags OK
1658 map->flags |= MF_MAP_DESTROYED;
1659 XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1660 //make all pending requests on map to fail
1661 uint64_t qsize = xq_count(&map->pending);
1662 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
1664 struct peer_req * preq = (struct peer_req *) idx;
1665 dispatch(peer, preq, preq->req, internal);
1667 //free map resources;
1668 remove_map(mapper, map);
1669 struct map_node *mn = find_object(map, 0);
1672 xq_free(&map->pending);
1679 static int handle_delete(struct peerd *peer, struct peer_req *pr,
1680 struct xseg_request *req)
1682 struct mapperd *mapper = __get_mapperd(peer);
1683 struct mapper_io *mio = __get_mapper_io(pr);
1684 struct map_node *mn;
1687 if (req->state & XS_FAILED && !(req->state &XS_SERVED))
1690 mn = __get_copyup_node(mio, req);
1691 __set_copyup_node(mio, req, NULL);
1692 char *target = xseg_get_target(peer->xseg, req);
1695 map = find_map(mapper, target, req->targetlen);
1697 xseg_put_request(peer->xseg, req, pr->portno);
1700 handle_map_delete(peer, pr, map, err);
1705 xseg_put_request(peer->xseg, req, pr->portno);
1708 handle_object_delete(peer, pr, mn, err);
1710 xseg_put_request(peer->xseg, req, pr->portno);
1714 static int handle_destroy(struct peerd *peer, struct peer_req *pr,
1715 struct xseg_request *req)
1717 struct mapperd *mapper = __get_mapperd(peer);
1718 struct mapper_io *mio = __get_mapper_io(pr);
1721 char buf[XSEG_MAX_TARGETLEN+1];
1722 char *target = xseg_get_target(peer->xseg, pr->req);
1724 strncpy(buf, target, pr->req->targetlen);
1725 buf[req->targetlen] = 0;
1727 XSEGLOG2(&lc, D, "Handle destroy pr: %lx, pr->req: %lx, req: %lx",
1728 (unsigned long) pr, (unsigned long) pr->req,
1729 (unsigned long) req);
1730 XSEGLOG2(&lc, D, "target: %s (%u)", buf, strlen(buf));
1731 if (pr->req != req && req->op == X_DELETE) {
1732 //assert mio->state == DELETING
1733 r = handle_delete(peer, pr, req);
1735 XSEGLOG2(&lc, E, "Handle delete returned error");
1744 r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1749 else if (r == MF_PENDING)
1751 if (map->flags & MF_MAP_DESTROYED) {
1752 if (mio->state == DELETING){
1753 XSEGLOG2(&lc, I, "Map %s destroyed", map->volume);
1757 XSEGLOG2(&lc, I, "Map %s already destroyed", map->volume);
1762 if (mio->state == DELETING) {
1763 //continue deleting map objects;
1764 r = delete_next_object(peer ,pr, map);
1765 if (r != MF_PENDING){
1771 r = delete_map(peer, pr, map);
1773 XSEGLOG2(&lc, E, "Map delete for map %s returned error", map->volume);
1776 } else if (r == MF_PENDING) {
1777 XSEGLOG2(&lc, I, "Map %s delete pending", map->volume);
1778 __xq_append_tail(&map->pending, (xqindex) pr);
1779 mio->state = DELETING;
1783 XSEGLOG2(&lc, E, "Destroy unreachable");
1788 static int handle_dropcache(struct peerd *peer, struct peer_req *pr,
1789 struct xseg_request *req)
1791 struct mapperd *mapper = __get_mapperd(peer);
1792 struct mapper_io *mio = __get_mapper_io(pr);
1795 char *target = xseg_get_target(peer->xseg, pr->req);
1801 struct map *map = find_map(mapper, target, pr->req->targetlen);
1805 } else if (map->flags & MF_MAP_DESTROYED) {
1808 } else if (map->flags & MF_MAP_NOT_READY && mio->state != DROPPING_CACHE) {
1809 __xq_append_tail(&map->pending, (xqindex) pr);
1813 if (mio->state != DROPPING_CACHE) {
1814 /* block all future operations on the map */
1815 map->flags |= MF_MAP_DROPPING_CACHE;
1817 mio->state = DROPPING_CACHE;
1818 XSEGLOG2(&lc, I, "Map %s start dropping cache", map->volume);
1820 XSEGLOG2(&lc, I, "Map %s continue dropping cache", map->volume);
1823 struct map_node *mn;
1825 for (i = mio->dcobj; i < calc_map_obj(map); i++) {
1826 mn = find_object(map, i);
1830 if (xq_count(&mn->pending) != 0){
1831 XSEGLOG2(&lc, D, "Map %s pending dropping cache for obj idx: %llu",
1832 map->volume, (unsigned long long) mn->objectidx);
1833 __xq_append_tail(&mn->pending, (xqindex) pr);
1836 xq_free(&mn->pending);
1837 XSEGLOG2(&lc, D, "Map %s dropped cache for obj idx: %llu",
1838 map->volume, (unsigned long long) mn->objectidx);
1840 remove_map(mapper, map);
1842 uint64_t qsize = xq_count(&map->pending);
1843 while(qsize > 0 && (i = __xq_pop_head(&map->pending)) != Noneidx){
1845 struct peer_req * preq = (struct peer_req *) i;
1846 dispatch(peer, preq, preq->req, internal);
1848 XSEGLOG2(&lc, I, "Map %s droped cache", map->volume);
1850 //free map resources;
1851 mn = find_object(map, 0);
1854 xq_free(&map->pending);
1862 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1863 enum dispatch_reason reason)
1865 struct mapperd *mapper = __get_mapperd(peer);
1867 struct mapper_io *mio = __get_mapper_io(pr);
1870 if (reason == accept)
1871 mio->state = ACCEPTED;
1873 if (req->op == X_READ) {
1874 /* catch map reads requests here */
1875 handle_mapread(peer, pr, req);
1879 switch (pr->req->op) {
1880 /* primary xseg operations of mapper */
1881 case X_CLONE: handle_clone(peer, pr, req); break;
1882 case X_MAPR: handle_mapr(peer, pr, req); break;
1883 case X_MAPW: handle_mapw(peer, pr, req); break;
1884 // case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1885 case X_INFO: handle_info(peer, pr, req); break;
1886 case X_DELETE: handle_destroy(peer, pr, req); break;
1887 case X_CLOSE: handle_dropcache(peer, pr, req); break;
1888 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1893 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1896 unsigned char buf[SHA256_DIGEST_SIZE];
1899 gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1901 /* Version check should be the very first call because it
1902 makes sure that important subsystems are intialized. */
1903 gcry_check_version (NULL);
1905 /* Disable secure memory. */
1906 gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1908 /* Tell Libgcrypt that initialization has completed. */
1909 gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1911 /* calculate out magic sha hash value */
1912 gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1914 /* calculate zero block */
1915 //FIXME check hash value
1916 zero = malloc(block_size);
1917 memset(zero, 0, block_size);
1918 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1919 for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1920 sprintf(zero_block + 2*i, "%02x", buf[i]);
1921 printf("%s \n", zero_block);
1924 //FIXME error checks
1925 struct mapperd *mapper = malloc(sizeof(struct mapperd));
1926 mapper->hashmaps = xhash_new(3, STRING);
1927 peer->priv = mapper;
1929 for (i = 0; i < peer->nr_ops; i++) {
1930 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1931 mio->copyups_nodes = xhash_new(3, INTEGER);
1934 peer->peer_reqs[i].priv = mio;
1937 for (i = 0; i < argc; i++) {
1938 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1939 mapper->bportno = atoi(argv[i+1]);
1943 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1944 mapper->mbportno = atoi(argv[i+1]);
1948 /* enforce only one thread */
1949 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1950 int t = atoi(argv[i+1]);
1952 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1960 const struct sched_param param = { .sched_priority = 99 };
1961 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
1969 void print_obj(struct map_node *mn)
1971 fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n",
1972 (unsigned long long) mn->objectidx, mn->object,
1973 (unsigned int) mn->objectlen,
1974 (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
1977 void print_map(struct map *m)
1979 uint64_t nr_objs = m->size/block_size;
1980 if (m->size % block_size)
1982 fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n",
1983 m->volume, m->volumelen,
1984 (unsigned long long) m->size,
1985 (unsigned long long) nr_objs);
1987 struct map_node *mn;
1988 if (nr_objs > 1000000) //FIXME to protect against invalid volume size
1990 for (i = 0; i < nr_objs; i++) {
1991 mn = find_object(m, i);
1993 printf("object idx [%llu] not found!\n", (unsigned long long) i);
2001 void test_map(struct peerd *peer)
2004 //struct sha256_ctx sha256ctx;
2005 unsigned char buf[SHA256_DIGEST_SIZE];
2006 char buf_new[XSEG_MAX_TARGETLEN + 20];
2007 struct map *m = malloc(sizeof(struct map));
2008 strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2009 m->volume[XSEG_MAX_TARGETLEN] = 0;
2010 strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2011 buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2012 m->volumelen = XSEG_MAX_TARGETLEN;
2013 m->size = 100*block_size;
2014 m->objects = xhash_new(3, INTEGER);
2015 struct map_node *map_node = calloc(100, sizeof(struct map_node));
2016 for (i = 0; i < 100; i++) {
2017 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2018 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2020 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2021 sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2023 map_node[i].objectidx = i;
2024 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2025 map_node[i].flags = MF_OBJECT_EXIST;
2026 ret = insert_object(m, &map_node[i]);
2029 char *data = malloc(block_size);
2030 mapheader_to_map(m, data);
2031 uint64_t pos = mapheader_size;
2033 for (i = 0; i < 100; i++) {
2034 map_node = find_object(m, i);
2036 printf("no object node %d \n", i);
2039 object_to_map(data+pos, map_node);
2040 pos += objectsize_in_map;
2044 struct map *m2 = malloc(sizeof(struct map));
2045 strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2046 m->volume[XSEG_MAX_TARGETLEN] = 0;
2047 m->volumelen = XSEG_MAX_TARGETLEN;
2049 m2->objects = xhash_new(3, INTEGER);
2050 ret = read_map(peer, m2, data);
2053 int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2055 while (sum < block_size) {
2056 r = write(fd, data + sum, block_size -sum);
2059 printf("write error\n");
2065 map_node = find_object(m, 0);