8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
16 #include <sys/syscall.h>
18 //GCRY_THREAD_OPTION_PTHREAD_IMPL;
19 #define MF_LOAD (1 << 0)
20 #define MF_EXCLUSIVE (1 << 1)
21 #define MF_FORCE (1 << 2)
22 #define MF_ARCHIP (1 << 3)
24 #define SHA256_DIGEST_SIZE 32
25 /* hex representation of sha256 value takes up double the sha256 size */
26 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
28 #define MAPPER_PREFIX "archip_"
29 #define MAPPER_PREFIX_LEN 7
31 #define block_size (1<<22) //FIXME this should be defined here?
33 /* transparency byte + max object len */
34 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN)
37 #define mapheader_size (sizeof(uint64_t))
39 /* reserve enough space to hold _%u object idx */
40 #define MAX_VOLUME_LEN (XSEG_MAX_TARGETLEN - 20)
42 #define MF_OBJECT_EXIST (1 << 0)
43 #define MF_OBJECT_COPYING (1 << 1)
44 #define MF_OBJECT_WRITING (1 << 2)
45 #define MF_OBJECT_DELETING (1 << 3)
46 #define MF_OBJECT_DELETED (1 << 4)
47 #define MF_OBJECT_DESTROYED (1 << 5)
49 #define MF_OBJECT_NOT_READY (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
51 //char *magic_string = "This a magic string. Please hash me";
52 //unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
53 //char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
55 char *zero_block="zeroblock";
56 #define ZERO_BLOCK_LEN 9 /* strlen(zero_block) */
58 //dispatch_internal mapper states
67 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
73 char object[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
80 #define MF_MAP_LOADING (1 << 0)
81 #define MF_MAP_DESTROYED (1 << 1)
82 #define MF_MAP_WRITING (1 << 2)
83 #define MF_MAP_DELETING (1 << 3)
84 #define MF_MAP_DROPPING_CACHE (1 << 4)
85 #define MF_MAP_EXCLUSIVE (1 << 5)
86 #define MF_MAP_OPENING (1 << 6)
87 #define MF_MAP_CLOSING (1 << 7)
89 #define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
90 MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
92 #define wait_on_pr(__pr, __condition__) \
93 while (__condition__){ \
95 __get_mapper_io(pr)->active = 0;\
96 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u", pr, ta); \
97 st_cond_wait(__pr->cond); \
100 #define wait_on_mapnode(__mn, __condition__) \
101 while (__condition__){ \
104 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, ta: %u", __mn, __mn->object, __mn->waiters, ta); \
105 st_cond_wait(__mn->cond); \
108 #define wait_on_map(__map, __condition__) \
109 while (__condition__){ \
112 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u", __map, __map->volume, __map->waiters, ta); \
113 st_cond_wait(__map->cond); \
116 #define signal_pr(__pr) \
118 if (!__get_mapper_io(pr)->active){\
120 XSEGLOG2(&lc, D, "Signaling pr %lx, ta: %u", pr, ta); \
121 __get_mapper_io(pr)->active = 1;\
122 st_cond_signal(__pr->cond); \
126 #define signal_map(__map) \
128 if (__map->waiters) { \
130 XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, ta: %u", __map, __map->volume, __map->waiters, ta); \
132 st_cond_signal(__map->cond); \
136 #define signal_mapnode(__mn) \
138 if (__mn->waiters) { \
139 ta += __mn->waiters; \
140 XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: %u, ta: %u", __mn, __mn->object, __mn->waiters, ta); \
142 st_cond_broadcast(__mn->cond); \
151 char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
152 xhash_t *objects; /* obj_index --> map_node */
159 xport bportno; /* blocker that accesses data */
160 xport mbportno; /* blocker that accesses maps */
161 xhash_t *hashmaps; // hash_function(target) --> struct map
165 volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */
166 xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
167 struct map_node *copyup_node;
168 volatile int err; /* error flag */
169 volatile uint64_t del_pending;
173 enum mapper_state state;
178 struct mapperd *mapper;
180 void print_map(struct map *m);
183 void custom_peer_usage()
185 fprintf(stderr, "Custom peer options: \n"
186 "-bp : port for block blocker(!)\n"
187 "-mbp : port for map blocker\n"
196 static inline struct mapperd * __get_mapperd(struct peerd *peer)
198 return (struct mapperd *) peer->priv;
201 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
203 return (struct mapper_io *) pr->priv;
206 static inline uint64_t calc_map_obj(struct map *map)
210 uint64_t nr_objs = map->size / block_size;
211 if (map->size % block_size)
216 static uint32_t calc_nr_obj(struct xseg_request *req)
219 uint64_t rem_size = req->size;
220 uint64_t obj_offset = req->offset & (block_size -1); //modulo
221 uint64_t obj_size = (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
222 rem_size -= obj_size;
223 while (rem_size > 0) {
224 obj_size = (rem_size > block_size) ? block_size : rem_size;
225 rem_size -= obj_size;
233 * Maps handling functions
236 static struct map * find_map(struct mapperd *mapper, char *volume)
238 struct map *m = NULL;
239 int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
246 static struct map * find_map_len(struct mapperd *mapper, char *target,
247 uint32_t targetlen, uint32_t flags)
249 char buf[XSEG_MAX_TARGETLEN+1];
250 if (flags & MF_ARCHIP){
251 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
252 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
253 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
254 targetlen += MAPPER_PREFIX_LEN;
257 strncpy(buf, target, targetlen);
261 if (targetlen > MAX_VOLUME_LEN){
262 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
263 targetlen, MAX_VOLUME_LEN);
267 XSEGLOG2(&lc, D, "looking up map %s, len %u",
269 return find_map(mapper, buf);
273 static int insert_map(struct mapperd *mapper, struct map *map)
277 if (find_map(mapper, map->volume)){
278 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
282 XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)",
283 map->volume, strlen(map->volume), (unsigned long) map);
284 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
285 while (r == -XHASH_ERESIZE) {
286 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
287 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
289 XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
290 (unsigned long long) shift);
293 mapper->hashmaps = new_hashmap;
294 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
300 static int remove_map(struct mapperd *mapper, struct map *map)
304 //assert no pending pr on map
306 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
307 while (r == -XHASH_ERESIZE) {
308 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
309 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
311 XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
312 (unsigned long long) shift);
315 mapper->hashmaps = new_hashmap;
316 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
322 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
326 struct peerd *peer = pr->peer;
327 struct xseg_request *req;
328 struct mapperd *mapper = __get_mapperd(peer);
331 XSEGLOG2(&lc, I, "Closing map %s", map->volume);
333 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
335 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
340 r = xseg_prep_request(peer->xseg, req, map->volumelen, block_size);
342 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
347 char *reqtarget = xseg_get_target(peer->xseg, req);
350 strncpy(reqtarget, map->volume, req->targetlen);
352 req->size = block_size;
354 r = xseg_set_req_data(peer->xseg, req, pr);
356 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
360 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
362 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
366 r = xseg_signal(peer->xseg, p);
368 XSEGLOG2(&lc, I, "Map %s closing", map->volume);
372 xseg_get_req_data(peer->xseg, req, &dummy);
374 xseg_put_request(peer->xseg, req, pr->portno);
379 static int close_map(struct peer_req *pr, struct map *map)
382 struct xseg_request *req;
383 struct peerd *peer = pr->peer;
385 map->flags |= MF_MAP_CLOSING;
386 req = __close_map(pr, map);
389 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
390 map->flags &= ~MF_MAP_CLOSING;
391 err = req->state & XS_FAILED;
392 xseg_put_request(peer->xseg, req, pr->portno);
399 static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
400 char *target, uint32_t targetlen, struct map **m)
402 struct mapperd *mapper = __get_mapperd(peer);
404 *m = find_map(mapper, target, targetlen);
406 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
407 if ((*m)->flags & MF_MAP_NOT_READY) {
408 __xq_append_tail(&(*m)->pending, (xqindex) pr);
409 XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
411 //} else if ((*m)->flags & MF_MAP_DESTROYED){
415 XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
419 r = open_map(peer, pr, target, targetlen, 0);
426 * Object handling functions
429 struct map_node *find_object(struct map *map, uint64_t obj_index)
432 int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
438 static int insert_object(struct map *map, struct map_node *mn)
440 //FIXME no find object first
441 int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
442 if (r == -XHASH_ERESIZE) {
443 unsigned long shift = xhash_grow_size_shift(map->objects);
444 map->objects = xhash_resize(map->objects, shift, NULL);
447 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
454 * map read/write functions
456 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
459 //hexlify sha256 value
460 for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
461 sprintf(mn->object+2*i, "%02x", buf[i]);
464 mn->object[SHA256_DIGEST_SIZE * 2] = 0;
465 mn->objectlen = SHA256_DIGEST_SIZE * 2;
466 mn->flags = MF_OBJECT_EXIST;
469 static inline void map_to_object(struct map_node *mn, char *buf)
474 mn->flags |= MF_OBJECT_EXIST;
475 memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
476 mn->object[XSEG_MAX_TARGETLEN] = 0;
477 mn->objectlen = strlen(mn->object);
480 static inline void object_to_map(char* buf, struct map_node *mn)
482 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
483 memcpy(buf+1, mn->object, mn->objectlen);
484 /* zero out the rest of the buffer */
485 memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen);
488 static inline void mapheader_to_map(struct map *m, char *buf)
491 // memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
492 // pos += SHA256_DIGEST_SIZE;
493 memcpy(buf + pos, &m->size, sizeof(m->size));
494 pos += sizeof(m->size);
498 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
499 struct map *map, struct map_node *mn)
502 struct mapperd *mapper = __get_mapperd(peer);
503 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
504 mapper->mbportno, X_ALLOC);
506 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
508 mn->object, map->volume, (unsigned long long) mn->objectidx);
511 int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
513 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
515 mn->object, map->volume, (unsigned long long) mn->objectidx);
518 char *target = xseg_get_target(peer->xseg, req);
519 strncpy(target, map->volume, req->targetlen);
520 req->size = objectsize_in_map;
521 req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
523 char *data = xseg_get_data(peer->xseg, req);
524 object_to_map(data, mn);
526 r = xseg_set_req_data(peer->xseg, req, pr);
528 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
530 mn->object, map->volume, (unsigned long long) mn->objectidx);
533 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
535 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
537 mn->object, map->volume, (unsigned long long) mn->objectidx);
540 r = xseg_signal(peer->xseg, p);
542 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
544 XSEGLOG2(&lc, I, "Writing object %s \n\t"
546 mn->object, map->volume, (unsigned long long) mn->objectidx);
551 xseg_get_req_data(peer->xseg, req, &dummy);
553 xseg_put_request(peer->xseg, req, pr->portno);
555 XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
557 mn->object, map->volume, (unsigned long long) mn->objectidx);
561 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
564 struct peerd *peer = pr->peer;
565 struct mapperd *mapper = __get_mapperd(peer);
567 uint64_t i, pos, max_objidx = calc_map_obj(map);
568 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
569 mapper->mbportno, X_ALLOC);
571 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
574 int r = xseg_prep_request(peer->xseg, req, map->volumelen,
575 mapheader_size + max_objidx * objectsize_in_map);
577 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
580 char *target = xseg_get_target(peer->xseg, req);
581 strncpy(target, map->volume, req->targetlen);
582 char *data = xseg_get_data(peer->xseg, req);
583 mapheader_to_map(map, data);
584 pos = mapheader_size;
586 req->size = req->datalen;
589 if (map->size % block_size)
591 for (i = 0; i < max_objidx; i++) {
592 mn = find_object(map, i);
594 XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
595 (unsigned long long) i, map->volume);
598 object_to_map(data+pos, mn);
599 pos += objectsize_in_map;
601 r = xseg_set_req_data(peer->xseg, req, pr);
603 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
607 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
609 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
613 r = xseg_signal(peer->xseg, p);
615 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
617 map->flags |= MF_MAP_WRITING;
618 XSEGLOG2(&lc, I, "Writing map %s", map->volume);
622 xseg_get_req_data(peer->xseg, req, &dummy);
624 xseg_put_request(peer->xseg, req, pr->portno);
626 XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
630 static int write_map(struct peer_req* pr, struct map *map)
633 struct peerd *peer = pr->peer;
634 map->flags |= MF_MAP_WRITING;
635 struct xseg_request *req = __write_map(pr, map);
638 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
639 if (req->state & XS_FAILED)
641 xseg_put_request(peer->xseg, req, pr->portno);
642 map->flags &= ~MF_MAP_WRITING;
646 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
650 struct xseg_request *req;
651 struct peerd *peer = pr->peer;
652 struct mapperd *mapper = __get_mapperd(peer);
655 XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
657 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
659 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
664 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
666 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
671 char *reqtarget = xseg_get_target(peer->xseg, req);
674 strncpy(reqtarget, m->volume, req->targetlen);
676 req->size = block_size;
678 r = xseg_set_req_data(peer->xseg, req, pr);
680 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
684 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
686 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
690 r = xseg_signal(peer->xseg, p);
692 XSEGLOG2(&lc, I, "Map %s loading", m->volume);
696 xseg_get_req_data(peer->xseg, req, &dummy);
698 xseg_put_request(peer->xseg, req, pr->portno);
703 static int read_map (struct map *map, char *buf)
705 char nulls[SHA256_DIGEST_SIZE];
706 memset(nulls, 0, SHA256_DIGEST_SIZE);
708 int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
710 XSEGLOG2(&lc, D, "Read zeros");
714 //type 1, our type, type 0 pithos map
715 int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
716 XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
719 struct map_node *map_node;
722 map->size = *(uint64_t *) (buf + pos);
723 pos += sizeof(uint64_t);
724 nr_objs = map->size / block_size;
725 if (map->size % block_size)
727 map_node = calloc(nr_objs, sizeof(struct map_node));
731 for (i = 0; i < nr_objs; i++) {
732 map_node[i].map = map;
733 map_node[i].objectidx = i;
734 map_node[i].waiters = 0;
736 map_node[i].cond = st_cond_new(); //FIXME err check;
737 map_to_object(&map_node[i], buf + pos);
738 pos += objectsize_in_map;
739 r = insert_object(map, &map_node[i]); //FIXME error check
743 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
744 map_node = calloc(max_nr_objs, sizeof(struct map_node));
747 for (i = 0; i < max_nr_objs; i++) {
748 if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
750 map_node[i].objectidx = i;
751 map_node[i].map = map;
752 map_node[i].waiters = 0;
754 map_node[i].cond = st_cond_new(); //FIXME err check;
755 pithosmap_to_object(&map_node[i], buf + pos);
756 pos += SHA256_DIGEST_SIZE;
757 r = insert_object(map, &map_node[i]); //FIXME error check
759 map->size = i * block_size;
762 XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
765 //FIXME cleanup on error
768 static int load_map(struct peer_req *pr, struct map *map)
771 struct xseg_request *req;
772 struct peerd *peer = pr->peer;
773 map->flags |= MF_MAP_LOADING;
774 req = __load_map(pr, map);
777 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
778 map->flags &= ~MF_MAP_LOADING;
779 if (req->state & XS_FAILED){
780 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
781 xseg_put_request(peer->xseg, req, pr->portno);
784 r = read_map(map, xseg_get_data(peer->xseg, req));
785 xseg_put_request(peer->xseg, req, pr->portno);
789 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
794 struct xseg_request *req;
795 struct peerd *peer = pr->peer;
796 struct mapperd *mapper = __get_mapperd(peer);
799 XSEGLOG2(&lc, I, "Opening map %s", m->volume);
801 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
803 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
808 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
810 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
815 char *reqtarget = xseg_get_target(peer->xseg, req);
818 strncpy(reqtarget, m->volume, req->targetlen);
820 req->size = block_size;
822 if (!(flags & MF_FORCE))
823 req->flags = XF_NOSYNC;
824 r = xseg_set_req_data(peer->xseg, req, pr);
826 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
830 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
832 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
836 r = xseg_signal(peer->xseg, p);
838 XSEGLOG2(&lc, I, "Map %s opening", m->volume);
842 xseg_get_req_data(peer->xseg, req, &dummy);
844 xseg_put_request(peer->xseg, req, pr->portno);
849 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
852 struct xseg_request *req;
853 struct peerd *peer = pr->peer;
855 map->flags |= MF_MAP_OPENING;
856 req = __open_map(pr, map, flags);
859 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
860 map->flags &= ~MF_MAP_OPENING;
861 err = req->state & XS_FAILED;
862 xseg_put_request(peer->xseg, req, pr->portno);
866 map->flags |= MF_MAP_EXCLUSIVE;
874 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
878 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
879 if (r == -XHASH_ERESIZE) {
880 xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
881 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
884 mio->copyups_nodes = new_hashmap;
885 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
889 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
890 if (r == -XHASH_ERESIZE) {
891 xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
892 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
895 mio->copyups_nodes = new_hashmap;
896 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
903 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
906 int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
912 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
914 struct mapperd *mapper = __get_mapperd(peer);
915 struct mapper_io *mio = __get_mapper_io(pr);
916 struct map *map = mn->map;
921 uint32_t newtargetlen;
922 char new_target[XSEG_MAX_TARGETLEN + 1];
923 strncpy(new_target, map->volume, map->volumelen);
924 sprintf(new_target + map->volumelen, "_%u", mn->objectidx);
925 new_target[XSEG_MAX_TARGETLEN] = 0;
926 newtargetlen = strlen(new_target);
928 if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
929 goto copyup_zeroblock;
931 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
932 mapper->bportno, X_ALLOC);
934 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
937 r = xseg_prep_request(peer->xseg, req, newtargetlen,
938 sizeof(struct xseg_request_copy));
940 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
944 char *target = xseg_get_target(peer->xseg, req);
945 strncpy(target, new_target, req->targetlen);
947 struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
948 strncpy(xcopy->target, mn->object, mn->objectlen);
949 xcopy->targetlen = mn->objectlen;
952 req->size = block_size;
954 r = xseg_set_req_data(peer->xseg, req, pr);
956 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
959 r = __set_copyup_node(mio, req, mn);
960 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
962 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
965 xseg_signal(peer->xseg, p);
968 mn->flags |= MF_OBJECT_COPYING;
969 XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
973 r = __set_copyup_node(mio, req, NULL);
974 xseg_get_req_data(peer->xseg, req, &dummy);
976 xseg_put_request(peer->xseg, req, pr->portno);
978 XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
982 XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
983 "Proceeding in writing the new object in map");
984 /* construct a tmp map_node for writing purposes */
985 struct map_node newmn = *mn;
986 newmn.flags = MF_OBJECT_EXIST;
987 strncpy(newmn.object, new_target, newtargetlen);
988 newmn.object[newtargetlen] = 0;
989 newmn.objectlen = newtargetlen;
990 newmn.objectidx = mn->objectidx;
991 req = object_write(peer, pr, map, &newmn);
992 r = __set_copyup_node(mio, req, mn);
994 XSEGLOG2(&lc, E, "Object write returned error for object %s"
995 "\n\t of map %s [%llu]",
996 mn->object, map->volume, (unsigned long long) mn->objectidx);
999 mn->flags |= MF_OBJECT_WRITING;
1000 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1004 static struct xseg_request * delete_object(struct peer_req *pr, struct map_node *mn)
1007 struct peerd *peer = pr->peer;
1008 struct mapperd *mapper = __get_mapperd(peer);
1009 struct mapper_io *mio = __get_mapper_io(pr);
1010 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1011 mapper->bportno, X_ALLOC);
1012 XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1014 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1017 int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1019 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1022 char *target = xseg_get_target(peer->xseg, req);
1023 strncpy(target, mn->object, req->targetlen);
1025 req->size = req->datalen;
1027 r = xseg_set_req_data(peer->xseg, req, pr);
1029 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1032 __set_copyup_node(mio, req, mn);
1033 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1035 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1038 r = xseg_signal(peer->xseg, p);
1039 XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1043 xseg_get_req_data(peer->xseg, req, &dummy);
1045 xseg_put_request(peer->xseg, req, pr->portno);
1047 XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1051 static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
1054 struct peerd *peer = pr->peer;
1055 struct mapperd *mapper = __get_mapperd(peer);
1056 struct mapper_io *mio = __get_mapper_io(pr);
1057 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1058 mapper->mbportno, X_ALLOC);
1059 XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1061 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1064 int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1066 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1069 char *target = xseg_get_target(peer->xseg, req);
1070 strncpy(target, map->volume, req->targetlen);
1072 req->size = req->datalen;
1074 r = xseg_set_req_data(peer->xseg, req, pr);
1076 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1079 __set_copyup_node(mio, req, NULL);
1080 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1082 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1085 r = xseg_signal(peer->xseg, p);
1086 map->flags |= MF_MAP_DELETING;
1087 XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1091 xseg_get_req_data(peer->xseg, req, &dummy);
1093 xseg_put_request(peer->xseg, req, pr->portno);
1095 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1100 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1102 struct map_node *mn = find_object(map, index);
1108 static inline void put_mapnode(struct map_node *mn)
1113 st_cond_destroy(mn->cond);
1117 static inline void __get_map(struct map *map)
1122 static inline void put_map(struct map *map)
1124 struct map_node *mn;
1127 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1130 for (i = 0; i < calc_map_obj(map); i++) {
1131 mn = get_mapnode(map, i);
1133 //make sure all pending operations on all objects are completed
1134 if (mn->flags & MF_OBJECT_NOT_READY){
1135 //this should never happen...
1136 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1138 mn->flags &= MF_OBJECT_DESTROYED;
1139 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1140 put_mapnode(mn); //matcing get_mapnode;
1141 //assert mn->ref == 0;
1144 mn = find_object(map, 0);
1147 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1152 static struct map * create_map(struct mapperd *mapper, char *name,
1153 uint32_t namelen, uint32_t flags)
1156 if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1157 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1158 namelen, MAX_VOLUME_LEN);
1161 struct map *m = malloc(sizeof(struct map));
1163 XSEGLOG2(&lc, E, "Cannot allocate map ");
1167 if (flags & MF_ARCHIP){
1168 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1169 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1170 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1171 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1174 strncpy(m->volume, name, namelen);
1175 m->volume[namelen] = 0;
1176 m->volumelen = namelen;
1179 m->objects = xhash_new(3, INTEGER);
1181 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1187 m->cond = st_cond_new(); //FIXME err check;
1188 r = insert_map(mapper, m);
1190 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1197 xhash_free(m->objects);
1199 XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1207 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1209 struct peerd *peer = pr->peer;
1210 struct mapperd *mapper = __get_mapperd(peer);
1212 struct mapper_io *mio = __get_mapper_io(pr);
1213 struct map_node *mn = __get_copyup_node(mio, req);
1216 if (req->state & XS_FAILED){
1220 xseg_put_request(peer->xseg, req, pr->portno);
1224 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1226 struct peerd *peer = pr->peer;
1227 struct mapperd *mapper = __get_mapperd(peer);
1229 struct mapper_io *mio = __get_mapper_io(pr);
1230 struct map_node *mn = __get_copyup_node(mio, req);
1232 XSEGLOG2(&lc, E, "Cannot get map node");
1235 __set_copyup_node(mio, req, NULL);
1237 if (req->state & XS_FAILED){
1238 XSEGLOG2(&lc, E, "Req failed");
1239 mn->flags &= ~MF_OBJECT_COPYING;
1240 mn->flags &= ~MF_OBJECT_WRITING;
1243 if (req->op == X_WRITE) {
1244 char *target = xseg_get_target(peer->xseg, req);
1246 //printf("handle object write replyi\n");
1247 __set_copyup_node(mio, req, NULL);
1248 //assert mn->flags & MF_OBJECT_WRITING
1249 mn->flags &= ~MF_OBJECT_WRITING;
1251 struct map_node tmp;
1252 char *data = xseg_get_data(peer->xseg, req);
1253 map_to_object(&tmp, data);
1254 mn->flags |= MF_OBJECT_EXIST;
1255 if (mn->flags != MF_OBJECT_EXIST){
1256 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1259 //assert mn->flags & MF_OBJECT_EXIST
1260 strncpy(mn->object, tmp.object, tmp.objectlen);
1261 mn->object[tmp.objectlen] = 0;
1262 mn->objectlen = tmp.objectlen;
1263 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1266 } else if (req->op == X_COPY) {
1267 // issue write_object;
1268 mn->flags &= ~MF_OBJECT_COPYING;
1269 struct map *map = mn->map;
1271 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1275 /* construct a tmp map_node for writing purposes */
1276 char *target = xseg_get_target(peer->xseg, req);
1277 struct map_node newmn = *mn;
1278 newmn.flags = MF_OBJECT_EXIST;
1279 strncpy(newmn.object, target, req->targetlen);
1280 newmn.object[req->targetlen] = 0;
1281 newmn.objectlen = req->targetlen;
1282 newmn.objectidx = mn->objectidx;
1283 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1285 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1286 "\n\t of map %s [%llu]",
1287 mn->object, map->volume, (unsigned long long) mn->objectidx);
1290 mn->flags |= MF_OBJECT_WRITING;
1291 __set_copyup_node (mio, xreq, mn);
1293 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1300 xseg_put_request(peer->xseg, req, pr->portno);
1305 XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1314 struct map_node *mn;
1319 static int req2objs(struct peer_req *pr, struct map *map, int write)
1322 struct peerd *peer = pr->peer;
1323 struct mapper_io *mio = __get_mapper_io(pr);
1324 char *target = xseg_get_target(peer->xseg, pr->req);
1325 uint32_t nr_objs = calc_nr_obj(pr->req);
1326 uint64_t size = sizeof(struct xseg_reply_map) +
1327 nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1328 uint32_t idx, i, ready;
1329 uint64_t rem_size, obj_index, obj_offset, obj_size;
1330 struct map_node *mn;
1332 XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1334 /* get map_nodes of request */
1335 struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1337 XSEGLOG2(&lc, E, "Cannot allocate mns");
1341 rem_size = pr->req->size;
1342 obj_index = pr->req->offset / block_size;
1343 obj_offset = pr->req->offset & (block_size -1); //modulo
1344 obj_size = (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1345 mn = get_mapnode(map, obj_index);
1347 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1352 mns[idx].offset = obj_offset;
1353 mns[idx].size = obj_size;
1354 rem_size -= obj_size;
1355 while (rem_size > 0) {
1359 obj_size = (rem_size > block_size) ? block_size : rem_size;
1360 rem_size -= obj_size;
1361 mn = get_mapnode(map, obj_index);
1363 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1368 mns[idx].offset = obj_offset;
1369 mns[idx].size = obj_size;
1375 while (ready < (idx + 1)){
1377 for (i = 0; i < (idx+1); i++) {
1380 if (mn->flags & MF_OBJECT_NOT_READY) {
1382 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1383 if (mn->flags & MF_OBJECT_DELETED){
1387 XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1395 else if (!(mn->flags & MF_OBJECT_EXIST)) {
1396 //calc new_target, copy up object
1397 if (copyup_object(peer, mn, pr) == NULL){
1398 XSEGLOG2(&lc, E, "Error in copy up object");
1411 while(mio->copyups > 0){
1412 mio->cb = copyup_cb;
1415 st_cond_wait(pr->cond);
1422 XSEGLOG2(&lc, E, "Mio->err");
1426 /* resize request to fit reply */
1427 char buf[XSEG_MAX_TARGETLEN];
1428 strncpy(buf, target, pr->req->targetlen);
1429 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1431 XSEGLOG2(&lc, E, "Cannot resize request");
1434 target = xseg_get_target(peer->xseg, pr->req);
1435 strncpy(target, buf, pr->req->targetlen);
1437 /* structure reply */
1438 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1439 reply->cnt = nr_objs;
1440 for (i = 0; i < (idx+1); i++) {
1441 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1442 reply->segs[i].targetlen = mns[i].mn->objectlen;
1443 reply->segs[i].offset = mns[i].offset;
1444 reply->segs[i].size = mns[i].size;
1447 for (i = 0; i < idx; i++) {
1448 put_mapnode(mns[i].mn);
1454 static int do_dropcache(struct peer_req *pr, struct map *map)
1456 struct map_node *mn;
1457 struct peerd *peer = pr->peer;
1458 struct mapperd *mapper = __get_mapperd(peer);
1460 XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1461 map->flags |= MF_MAP_DROPPING_CACHE;
1462 for (i = 0; i < calc_map_obj(map); i++) {
1463 mn = get_mapnode(map, i);
1465 if (!(mn->flags & MF_OBJECT_DESTROYED)){
1466 //make sure all pending operations on all objects are completed
1467 if (mn->flags & MF_OBJECT_NOT_READY){
1468 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1470 mn->flags &= MF_OBJECT_DESTROYED;
1475 map->flags &= ~MF_MAP_DROPPING_CACHE;
1476 map->flags |= MF_MAP_DESTROYED;
1477 remove_map(mapper, map);
1478 XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1479 put_map(map); // put map here to destroy it (matches m->ref = 1 on map create)
1483 static int do_info(struct peer_req *pr, struct map *map)
1485 struct peerd *peer = pr->peer;
1486 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1487 xinfo->size = map->size;
1492 static int do_close(struct peer_req *pr, struct map *map)
1494 // struct peerd *peer = pr->peer;
1495 // struct xseg_request *req;
1496 if (map->flags & MF_MAP_EXCLUSIVE)
1498 return do_dropcache(pr, map);
1501 static int do_destroy(struct peer_req *pr, struct map *map)
1504 struct peerd *peer = pr->peer;
1505 struct mapper_io *mio = __get_mapper_io(pr);
1506 struct map_node *mn;
1507 struct xseg_request *req;
1509 XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1510 map->flags |= MF_MAP_DELETING;
1511 req = delete_map(pr, map);
1514 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1515 if (req->state & XS_FAILED){
1516 xseg_put_request(peer->xseg, req, pr->portno);
1517 map->flags &= ~MF_MAP_DELETING;
1520 xseg_put_request(peer->xseg, req, pr->portno);
1522 uint64_t nr_obj = calc_map_obj(map);
1523 uint64_t deleted = 0;
1524 while (deleted < nr_obj){
1526 for (i = 0; i < nr_obj; i++){
1527 mn = get_mapnode(map, i);
1529 if (!(mn->flags & MF_OBJECT_DESTROYED)){
1530 if (mn->flags & MF_OBJECT_EXIST){
1531 //make sure all pending operations on all objects are completed
1532 if (mn->flags & MF_OBJECT_NOT_READY){
1533 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1535 req = delete_object(pr, mn);
1537 if (mio->del_pending){
1546 mn->flags &= MF_OBJECT_DESTROYED;
1553 mio->cb = deletion_cb;
1554 wait_on_pr(pr, mio->del_pending > 0);
1557 map->flags &= ~MF_MAP_DELETING;
1558 XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1559 return do_close(pr, map);
1562 static int do_mapr(struct peer_req *pr, struct map *map)
1564 struct peerd *peer = pr->peer;
1565 int r = req2objs(pr, map, 0);
1567 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1569 (unsigned long long) pr->req->offset,
1570 (unsigned long long) (pr->req->offset + pr->req->size));
1573 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1575 (unsigned long long) pr->req->offset,
1576 (unsigned long long) (pr->req->offset + pr->req->size));
1577 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1578 (unsigned long long) pr->req->offset,
1579 (unsigned long long) pr->req->size);
1580 char buf[XSEG_MAX_TARGETLEN+1];
1581 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1583 for (i = 0; i < reply->cnt; i++) {
1584 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1585 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1586 buf[reply->segs[i].targetlen] = 0;
1587 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1588 (unsigned long long) reply->segs[i].offset,
1589 (unsigned long long) reply->segs[i].size);
1594 static int do_mapw(struct peer_req *pr, struct map *map)
1596 struct peerd *peer = pr->peer;
1597 int r = req2objs(pr, map, 1);
1599 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1601 (unsigned long long) pr->req->offset,
1602 (unsigned long long) (pr->req->offset + pr->req->size));
1605 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1607 (unsigned long long) pr->req->offset,
1608 (unsigned long long) (pr->req->offset + pr->req->size));
1609 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1610 (unsigned long long) pr->req->offset,
1611 (unsigned long long) pr->req->size);
1612 char buf[XSEG_MAX_TARGETLEN+1];
1613 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1615 for (i = 0; i < reply->cnt; i++) {
1616 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1617 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1618 buf[reply->segs[i].targetlen] = 0;
1619 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1620 (unsigned long long) reply->segs[i].offset,
1621 (unsigned long long) reply->segs[i].size);
1626 //here map is the parent map
1627 static int do_clone(struct peer_req *pr, struct map *map)
1630 FIXME check if clone map exists
1631 clonemap = get_map(pr, target, targetlen, MF_LOAD);
1633 do_dropcache(pr, clonemap); // drop map here, rely on get_map_function to drop
1634 // cache on non-exclusive opens or declare a NO_CACHE flag ?
1639 char buf[XSEG_MAX_TARGETLEN];
1640 struct peerd *peer = pr->peer;
1641 struct mapperd *mapper = __get_mapperd(peer);
1642 char *target = xseg_get_target(peer->xseg, pr->req);
1643 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1644 XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1645 struct map *clonemap = create_map(mapper, target, pr->req->targetlen,
1650 if (xclone->size == -1)
1651 clonemap->size = map->size;
1653 clonemap->size = xclone->size;
1654 if (clonemap->size < map->size){
1655 target = xseg_get_target(peer->xseg, pr->req);
1656 strncpy(buf, target, pr->req->targetlen);
1657 buf[pr->req->targetlen] = 0;
1658 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1659 "\n\t for requested clone %s",
1660 (unsigned long long) xclone->size,
1661 (unsigned long long) map->size, buf);
1665 //alloc and init map_nodes
1666 unsigned long c = clonemap->size/block_size + 1;
1667 struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1672 for (i = 0; i < clonemap->size/block_size + 1; i++) {
1673 struct map_node *mn = get_mapnode(map, i);
1675 strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1676 map_nodes[i].objectlen = mn->objectlen;
1679 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1680 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1682 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1683 map_nodes[i].flags = 0;
1684 map_nodes[i].objectidx = i;
1685 map_nodes[i].map = clonemap;
1686 map_nodes[i].ref = 1;
1687 map_nodes[i].waiters = 0;
1688 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1689 r = insert_object(clonemap, &map_nodes[i]);
1691 XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1695 r = write_map(pr, clonemap);
1697 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1707 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1710 if (flags & MF_EXCLUSIVE){
1711 r = open_map(pr, map, flags);
1713 if (flags & MF_FORCE){
1720 r = load_map(pr, map);
1721 if (r < 0 && opened){
1727 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
1731 struct peerd *peer = pr->peer;
1732 struct mapperd *mapper = __get_mapperd(peer);
1733 struct map *map = find_map_len(mapper, name, namelen, flags);
1735 if (flags & MF_LOAD){
1736 map = create_map(mapper, name, namelen, flags);
1739 r = open_load_map(pr, map, flags);
1741 do_dropcache(pr, map);
1747 } else if (map->flags & MF_MAP_DESTROYED){
1755 static int map_action(int (action)(struct peer_req *pr, struct map *map),
1756 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1758 //struct peerd *peer = pr->peer;
1761 map = get_map(pr, name, namelen, flags);
1764 if (map->flags & MF_MAP_NOT_READY){
1765 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1769 int r = action(pr, map);
1770 //always drop cache if map not read exclusively
1771 if (!(map->flags & MF_MAP_EXCLUSIVE))
1772 do_dropcache(pr, map);
1778 void * handle_info(struct peer_req *pr)
1780 struct peerd *peer = pr->peer;
1781 char *target = xseg_get_target(peer->xseg, pr->req);
1782 int r = map_action(do_info, pr, target, pr->req->targetlen,
1792 void * handle_clone(struct peer_req *pr)
1795 struct peerd *peer = pr->peer;
1796 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1801 if (xclone->targetlen){
1802 //support clone only from pithos
1803 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
1811 char *target = xseg_get_target(peer->xseg, pr->req);
1812 XSEGLOG2(&lc, I, "Creating volume");
1813 map = get_map(pr, target, pr->req->targetlen,
1816 XSEGLOG2(&lc, E, "Volume %s exists", map->volume);
1817 if (map->ref <= 2) //initial one + one ref from __get_map
1818 do_dropcache(pr, map); //we are the only ones usining this map. Drop the cache.
1819 put_map(map); //matches get_map
1823 //create a new empty map of size
1824 map = create_map(mapper, target, pr->req->targetlen,
1830 map->size = xclone->size;
1831 //populate_map with zero objects;
1832 uint64_t nr_objs = xclone->size / block_size;
1833 if (xclone->size % block_size)
1836 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1838 do_dropcache(pr, map); //Since we just created the map, dropping cache should be sufficient.
1843 for (i = 0; i < nr_objs; i++) {
1844 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1845 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1846 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1847 map_nodes[i].flags = 0;
1848 map_nodes[i].objectidx = i;
1849 map_nodes[i].map = map;
1850 map_nodes[i].ref = 1;
1851 map_nodes[i].waiters = 0;
1852 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1853 r = insert_object(map, &map_nodes[i]);
1855 do_dropcache(pr, map);
1860 r = write_map(pr, map);
1862 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1863 do_dropcache(pr, map);
1866 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1868 do_dropcache(pr, map); //drop cache here for consistency
1880 void * handle_mapr(struct peer_req *pr)
1882 struct peerd *peer = pr->peer;
1883 char *target = xseg_get_target(peer->xseg, pr->req);
1884 int r = map_action(do_mapr, pr, target, pr->req->targetlen,
1885 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
1894 void * handle_mapw(struct peer_req *pr)
1896 struct peerd *peer = pr->peer;
1897 char *target = xseg_get_target(peer->xseg, pr->req);
1898 int r = map_action(do_mapw, pr, target, pr->req->targetlen,
1899 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1904 XSEGLOG2(&lc, D, "Ta: %d", ta);
1909 void * handle_destroy(struct peer_req *pr)
1911 struct peerd *peer = pr->peer;
1912 char *target = xseg_get_target(peer->xseg, pr->req);
1913 int r = map_action(do_destroy, pr, target, pr->req->targetlen,
1914 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1923 void * handle_close(struct peer_req *pr)
1925 struct peerd *peer = pr->peer;
1926 char *target = xseg_get_target(peer->xseg, pr->req);
1927 //here we do not want to load
1928 int r = map_action(do_close, pr, target, pr->req->targetlen,
1929 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
1938 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
1939 struct xseg_request *req)
1941 //struct mapperd *mapper = __get_mapperd(peer);
1942 struct mapper_io *mio = __get_mapper_io(pr);
1943 void *(*action)(struct peer_req *) = NULL;
1945 mio->state = ACCEPTED;
1948 switch (pr->req->op) {
1949 /* primary xseg operations of mapper */
1950 case X_CLONE: action = handle_clone; break;
1951 case X_MAPR: action = handle_mapr; break;
1952 case X_MAPW: action = handle_mapw; break;
1953 // case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1954 case X_INFO: action = handle_info; break;
1955 case X_DELETE: action = handle_destroy; break;
1956 case X_CLOSE: action = handle_close; break;
1957 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1962 st_thread_create(action, pr, 0, 0);
1968 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1969 enum dispatch_reason reason)
1971 struct mapperd *mapper = __get_mapperd(peer);
1973 struct mapper_io *mio = __get_mapper_io(pr);
1977 if (reason == dispatch_accept)
1978 dispatch_accepted(peer, pr, req);
1989 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1992 // unsigned char buf[SHA256_DIGEST_SIZE];
1993 // unsigned char *zero;
1995 //gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1997 /* Version check should be the very first call because it
1998 makes sure that important subsystems are intialized. */
1999 // gcry_check_version (NULL);
2001 /* Disable secure memory. */
2002 // gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
2004 /* Tell Libgcrypt that initialization has completed. */
2005 // gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
2007 /* calculate out magic sha hash value */
2008 // gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
2010 /* calculate zero block */
2011 //FIXME check hash value
2012 // zero = malloc(block_size);
2013 // memset(zero, 0, block_size);
2014 // gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
2015 // for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
2016 // sprintf(zero_block + 2*i, "%02x", buf[i]);
2017 // printf("%s \n", zero_block);
2020 //FIXME error checks
2021 struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2022 peer->priv = mapperd;
2024 mapper->hashmaps = xhash_new(3, STRING);
2026 for (i = 0; i < peer->nr_ops; i++) {
2027 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2028 mio->copyups_nodes = xhash_new(3, INTEGER);
2032 peer->peer_reqs[i].priv = mio;
2035 for (i = 0; i < argc; i++) {
2036 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
2037 mapper->bportno = atoi(argv[i+1]);
2041 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
2042 mapper->mbportno = atoi(argv[i+1]);
2046 /* enforce only one thread */
2047 if (!strcmp(argv[i], "-t") && (i+1) < argc){
2048 int t = atoi(argv[i+1]);
2050 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
2058 const struct sched_param param = { .sched_priority = 99 };
2059 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
2067 void print_obj(struct map_node *mn)
2069 fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n",
2070 (unsigned long long) mn->objectidx, mn->object,
2071 (unsigned int) mn->objectlen,
2072 (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2075 void print_map(struct map *m)
2077 uint64_t nr_objs = m->size/block_size;
2078 if (m->size % block_size)
2080 fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n",
2081 m->volume, m->volumelen,
2082 (unsigned long long) m->size,
2083 (unsigned long long) nr_objs);
2085 struct map_node *mn;
2086 if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2088 for (i = 0; i < nr_objs; i++) {
2089 mn = find_object(m, i);
2091 printf("object idx [%llu] not found!\n", (unsigned long long) i);
2099 void test_map(struct peerd *peer)
2102 //struct sha256_ctx sha256ctx;
2103 unsigned char buf[SHA256_DIGEST_SIZE];
2104 char buf_new[XSEG_MAX_TARGETLEN + 20];
2105 struct map *m = malloc(sizeof(struct map));
2106 strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2107 m->volume[XSEG_MAX_TARGETLEN] = 0;
2108 strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2109 buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2110 m->volumelen = XSEG_MAX_TARGETLEN;
2111 m->size = 100*block_size;
2112 m->objects = xhash_new(3, INTEGER);
2113 struct map_node *map_node = calloc(100, sizeof(struct map_node));
2114 for (i = 0; i < 100; i++) {
2115 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2116 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2118 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2119 sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2121 map_node[i].objectidx = i;
2122 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2123 map_node[i].flags = MF_OBJECT_EXIST;
2124 ret = insert_object(m, &map_node[i]);
2127 char *data = malloc(block_size);
2128 mapheader_to_map(m, data);
2129 uint64_t pos = mapheader_size;
2131 for (i = 0; i < 100; i++) {
2132 map_node = find_object(m, i);
2134 printf("no object node %d \n", i);
2137 object_to_map(data+pos, map_node);
2138 pos += objectsize_in_map;
2142 struct map *m2 = malloc(sizeof(struct map));
2143 strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2144 m->volume[XSEG_MAX_TARGETLEN] = 0;
2145 m->volumelen = XSEG_MAX_TARGETLEN;
2147 m2->objects = xhash_new(3, INTEGER);
2148 ret = read_map(peer, m2, data);
2151 int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2153 while (sum < block_size) {
2154 r = write(fd, data + sum, block_size -sum);
2157 printf("write error\n");
2163 map_node = find_object(m, 0);