8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
15 #include <sys/syscall.h>
16 #include <openssl/sha.h>
18 /* general mapper flags */
19 #define MF_LOAD (1 << 0)
20 #define MF_EXCLUSIVE (1 << 1)
21 #define MF_FORCE (1 << 2)
22 #define MF_ARCHIP (1 << 3)
24 #ifndef SHA256_DIGEST_SIZE
25 #define SHA256_DIGEST_SIZE 32
27 /* hex representation of sha256 value takes up double the sha256 size */
28 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
30 #define block_size (1<<22) //FIXME this should be defined here?
32 /* transparency byte + max object len in disk */
33 #define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
35 /* Map header contains:
39 #define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
42 #define MAPPER_PREFIX "archip_"
43 #define MAPPER_PREFIX_LEN 7
45 #define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
46 #define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
48 #if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
49 #error "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
52 #define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
54 #if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
55 #error "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
58 #define MAX_VOLUME_SIZE \
59 ((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
62 char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
63 #define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
65 /* dispatch_internal mapper states */
74 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
77 /* mapper object flags */
78 #define MF_OBJECT_EXIST (1 << 0)
79 #define MF_OBJECT_COPYING (1 << 1)
80 #define MF_OBJECT_WRITING (1 << 2)
81 #define MF_OBJECT_DELETING (1 << 3)
82 #define MF_OBJECT_DELETED (1 << 4)
83 #define MF_OBJECT_DESTROYED (1 << 5)
85 #define MF_OBJECT_NOT_READY (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
91 char object[MAX_OBJECT_LEN + 1]; /* NULL terminated string */
99 #define wait_on_pr(__pr, __condition__) \
100 while (__condition__){ \
102 __get_mapper_io(pr)->active = 0;\
103 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u", pr, ta); \
104 st_cond_wait(__pr->cond); \
107 #define wait_on_mapnode(__mn, __condition__) \
108 while (__condition__){ \
111 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
112 ta: %u", __mn, __mn->object, __mn->waiters, ta); \
113 st_cond_wait(__mn->cond); \
116 #define wait_on_map(__map, __condition__) \
117 while (__condition__){ \
120 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
121 __map, __map->volume, __map->waiters, ta); \
122 st_cond_wait(__map->cond); \
125 #define signal_pr(__pr) \
127 if (!__get_mapper_io(pr)->active){\
129 XSEGLOG2(&lc, D, "Signaling pr %lx, ta: %u", pr, ta);\
130 __get_mapper_io(pr)->active = 1;\
131 st_cond_signal(__pr->cond); \
135 #define signal_map(__map) \
137 if (__map->waiters) { \
139 XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
140 ta: %u", __map, __map->volume, __map->waiters, ta); \
142 st_cond_signal(__map->cond); \
146 #define signal_mapnode(__mn) \
148 if (__mn->waiters) { \
149 ta += __mn->waiters; \
150 XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
151 %u, ta: %u", __mn, __mn->object, __mn->waiters, ta); \
153 st_cond_broadcast(__mn->cond); \
159 #define MF_MAP_LOADING (1 << 0)
160 #define MF_MAP_DESTROYED (1 << 1)
161 #define MF_MAP_WRITING (1 << 2)
162 #define MF_MAP_DELETING (1 << 3)
163 #define MF_MAP_DROPPING_CACHE (1 << 4)
164 #define MF_MAP_EXCLUSIVE (1 << 5)
165 #define MF_MAP_OPENING (1 << 6)
166 #define MF_MAP_CLOSING (1 << 7)
168 #define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
169 MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
176 char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
177 xhash_t *objects; /* obj_index --> map_node */
184 xport bportno; /* blocker that accesses data */
185 xport mbportno; /* blocker that accesses maps */
186 xhash_t *hashmaps; // hash_function(target) --> struct map
190 volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */
191 xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
192 struct map_node *copyup_node;
193 volatile int err; /* error flag */
194 volatile uint64_t del_pending;
198 enum mapper_state state;
203 struct mapperd *mapper;
205 void print_map(struct map *m);
208 void custom_peer_usage()
210 fprintf(stderr, "Custom peer options: \n"
211 "-bp : port for block blocker(!)\n"
212 "-mbp : port for map blocker\n"
221 static inline struct mapperd * __get_mapperd(struct peerd *peer)
223 return (struct mapperd *) peer->priv;
226 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
228 return (struct mapper_io *) pr->priv;
231 static inline uint64_t calc_map_obj(struct map *map)
235 uint64_t nr_objs = map->size / block_size;
236 if (map->size % block_size)
241 static uint32_t calc_nr_obj(struct xseg_request *req)
244 uint64_t rem_size = req->size;
245 uint64_t obj_offset = req->offset & (block_size -1); //modulo
246 uint64_t obj_size = (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
247 rem_size -= obj_size;
248 while (rem_size > 0) {
249 obj_size = (rem_size > block_size) ? block_size : rem_size;
250 rem_size -= obj_size;
258 * Unsafe. Doesn't check if data length is odd!
260 static void hexlify(unsigned char *data, char *hex)
263 for (i=0; i<SHA256_DIGEST_LENGTH; i++)
264 sprintf(hex+2*i, "%02x", data[i]);
267 static void unhexlify(char *hex, unsigned char *data)
271 for (i=0; i<SHA256_DIGEST_LENGTH; i++){
286 data[i] |= (c << 4) & 0xF0;
304 * Maps handling functions
307 static struct map * find_map(struct mapperd *mapper, char *volume)
309 struct map *m = NULL;
310 int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
317 static struct map * find_map_len(struct mapperd *mapper, char *target,
318 uint32_t targetlen, uint32_t flags)
320 char buf[XSEG_MAX_TARGETLEN+1];
321 if (flags & MF_ARCHIP){
322 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
323 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
324 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
325 targetlen += MAPPER_PREFIX_LEN;
328 strncpy(buf, target, targetlen);
332 if (targetlen > MAX_VOLUME_LEN){
333 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
334 targetlen, MAX_VOLUME_LEN);
338 XSEGLOG2(&lc, D, "looking up map %s, len %u",
340 return find_map(mapper, buf);
344 static int insert_map(struct mapperd *mapper, struct map *map)
348 if (find_map(mapper, map->volume)){
349 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
353 XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)",
354 map->volume, strlen(map->volume), (unsigned long) map);
355 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
356 while (r == -XHASH_ERESIZE) {
357 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
358 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
360 XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
361 (unsigned long long) shift);
364 mapper->hashmaps = new_hashmap;
365 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
371 static int remove_map(struct mapperd *mapper, struct map *map)
375 //assert no pending pr on map
377 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
378 while (r == -XHASH_ERESIZE) {
379 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
380 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
382 XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
383 (unsigned long long) shift);
386 mapper->hashmaps = new_hashmap;
387 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
393 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
397 struct peerd *peer = pr->peer;
398 struct xseg_request *req;
399 struct mapperd *mapper = __get_mapperd(peer);
402 XSEGLOG2(&lc, I, "Closing map %s", map->volume);
404 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
406 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
411 r = xseg_prep_request(peer->xseg, req, map->volumelen, block_size);
413 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
418 char *reqtarget = xseg_get_target(peer->xseg, req);
421 strncpy(reqtarget, map->volume, req->targetlen);
423 req->size = block_size;
425 r = xseg_set_req_data(peer->xseg, req, pr);
427 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
431 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
433 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
437 r = xseg_signal(peer->xseg, p);
439 XSEGLOG2(&lc, I, "Map %s closing", map->volume);
443 xseg_get_req_data(peer->xseg, req, &dummy);
445 xseg_put_request(peer->xseg, req, pr->portno);
450 static int close_map(struct peer_req *pr, struct map *map)
453 struct xseg_request *req;
454 struct peerd *peer = pr->peer;
456 map->flags |= MF_MAP_CLOSING;
457 req = __close_map(pr, map);
460 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
461 map->flags &= ~MF_MAP_CLOSING;
462 err = req->state & XS_FAILED;
463 xseg_put_request(peer->xseg, req, pr->portno);
470 static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
471 char *target, uint32_t targetlen, struct map **m)
473 struct mapperd *mapper = __get_mapperd(peer);
475 *m = find_map(mapper, target, targetlen);
477 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
478 if ((*m)->flags & MF_MAP_NOT_READY) {
479 __xq_append_tail(&(*m)->pending, (xqindex) pr);
480 XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
482 //} else if ((*m)->flags & MF_MAP_DESTROYED){
486 XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
490 r = open_map(peer, pr, target, targetlen, 0);
497 * Object handling functions
500 struct map_node *find_object(struct map *map, uint64_t obj_index)
503 int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
509 static int insert_object(struct map *map, struct map_node *mn)
511 //FIXME no find object first
512 int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
513 if (r == -XHASH_ERESIZE) {
514 unsigned long shift = xhash_grow_size_shift(map->objects);
515 map->objects = xhash_resize(map->objects, shift, NULL);
518 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
525 * map read/write functions
527 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
529 hexlify(buf, mn->object);
530 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
531 mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
532 mn->flags = MF_OBJECT_EXIST;
535 static inline void map_to_object(struct map_node *mn, unsigned char *buf)
540 mn->flags |= MF_OBJECT_EXIST;
541 strcpy(mn->object, MAPPER_PREFIX);
542 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
543 mn->object[MAX_OBJECT_LEN] = 0;
544 mn->objectlen = strlen(mn->object);
547 hexlify(buf+1, mn->object);
548 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
549 mn->objectlen = strlen(mn->object);
554 static inline void object_to_map(char* buf, struct map_node *mn)
556 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
558 /* strip common prefix */
559 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
562 unhexlify(mn->object, (unsigned char *)(buf+1));
566 static inline void mapheader_to_map(struct map *m, char *buf)
569 memcpy(buf + pos, &m->version, sizeof(m->version));
570 pos += sizeof(m->version);
571 memcpy(buf + pos, &m->size, sizeof(m->size));
572 pos += sizeof(m->size);
576 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
577 struct map *map, struct map_node *mn)
580 struct mapperd *mapper = __get_mapperd(peer);
581 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
582 mapper->mbportno, X_ALLOC);
584 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
586 mn->object, map->volume, (unsigned long long) mn->objectidx);
589 int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
591 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
593 mn->object, map->volume, (unsigned long long) mn->objectidx);
596 char *target = xseg_get_target(peer->xseg, req);
597 strncpy(target, map->volume, req->targetlen);
598 req->size = objectsize_in_map;
599 req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
601 char *data = xseg_get_data(peer->xseg, req);
602 object_to_map(data, mn);
604 r = xseg_set_req_data(peer->xseg, req, pr);
606 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
608 mn->object, map->volume, (unsigned long long) mn->objectidx);
611 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
613 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
615 mn->object, map->volume, (unsigned long long) mn->objectidx);
618 r = xseg_signal(peer->xseg, p);
620 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
622 XSEGLOG2(&lc, I, "Writing object %s \n\t"
624 mn->object, map->volume, (unsigned long long) mn->objectidx);
629 xseg_get_req_data(peer->xseg, req, &dummy);
631 xseg_put_request(peer->xseg, req, pr->portno);
633 XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
635 mn->object, map->volume, (unsigned long long) mn->objectidx);
639 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
642 struct peerd *peer = pr->peer;
643 struct mapperd *mapper = __get_mapperd(peer);
645 uint64_t i, pos, max_objidx = calc_map_obj(map);
646 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
647 mapper->mbportno, X_ALLOC);
649 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
652 int r = xseg_prep_request(peer->xseg, req, map->volumelen,
653 mapheader_size + max_objidx * objectsize_in_map);
655 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
658 char *target = xseg_get_target(peer->xseg, req);
659 strncpy(target, map->volume, req->targetlen);
660 char *data = xseg_get_data(peer->xseg, req);
661 mapheader_to_map(map, data);
662 pos = mapheader_size;
664 req->size = req->datalen;
667 if (map->size % block_size)
669 for (i = 0; i < max_objidx; i++) {
670 mn = find_object(map, i);
672 XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
673 (unsigned long long) i, map->volume);
676 object_to_map(data+pos, mn);
677 pos += objectsize_in_map;
679 r = xseg_set_req_data(peer->xseg, req, pr);
681 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
685 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
687 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
691 r = xseg_signal(peer->xseg, p);
693 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
695 map->flags |= MF_MAP_WRITING;
696 XSEGLOG2(&lc, I, "Writing map %s", map->volume);
700 xseg_get_req_data(peer->xseg, req, &dummy);
702 xseg_put_request(peer->xseg, req, pr->portno);
704 XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
708 static int write_map(struct peer_req* pr, struct map *map)
711 struct peerd *peer = pr->peer;
712 map->flags |= MF_MAP_WRITING;
713 struct xseg_request *req = __write_map(pr, map);
716 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
717 if (req->state & XS_FAILED)
719 xseg_put_request(peer->xseg, req, pr->portno);
720 map->flags &= ~MF_MAP_WRITING;
724 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
728 struct xseg_request *req;
729 struct peerd *peer = pr->peer;
730 struct mapperd *mapper = __get_mapperd(peer);
733 XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
735 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
737 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
742 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
744 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
749 char *reqtarget = xseg_get_target(peer->xseg, req);
752 strncpy(reqtarget, m->volume, req->targetlen);
754 req->size = block_size;
756 r = xseg_set_req_data(peer->xseg, req, pr);
758 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
762 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
764 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
768 r = xseg_signal(peer->xseg, p);
770 XSEGLOG2(&lc, I, "Map %s loading", m->volume);
774 xseg_get_req_data(peer->xseg, req, &dummy);
776 xseg_put_request(peer->xseg, req, pr->portno);
781 static int read_map (struct map *map, unsigned char *buf)
783 char nulls[SHA256_DIGEST_SIZE];
784 memset(nulls, 0, SHA256_DIGEST_SIZE);
786 int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
788 XSEGLOG2(&lc, D, "Read zeros");
792 //type 1, archip type, type 0 pithos map
793 int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
794 XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
797 struct map_node *map_node;
800 map->version = *(uint32_t *) (buf + pos);
801 pos += sizeof(uint32_t);
802 map->size = *(uint64_t *) (buf + pos);
803 pos += sizeof(uint64_t);
804 nr_objs = map->size / block_size;
805 if (map->size % block_size)
807 map_node = calloc(nr_objs, sizeof(struct map_node));
811 for (i = 0; i < nr_objs; i++) {
812 map_node[i].map = map;
813 map_node[i].objectidx = i;
814 map_node[i].waiters = 0;
816 map_node[i].cond = st_cond_new(); //FIXME err check;
817 map_to_object(&map_node[i], buf + pos);
818 pos += objectsize_in_map;
819 r = insert_object(map, &map_node[i]); //FIXME error check
823 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
824 map_node = calloc(max_nr_objs, sizeof(struct map_node));
827 for (i = 0; i < max_nr_objs; i++) {
828 if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
830 map_node[i].objectidx = i;
831 map_node[i].map = map;
832 map_node[i].waiters = 0;
834 map_node[i].cond = st_cond_new(); //FIXME err check;
835 pithosmap_to_object(&map_node[i], buf + pos);
836 pos += SHA256_DIGEST_SIZE;
837 r = insert_object(map, &map_node[i]); //FIXME error check
839 map->size = i * block_size;
842 XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
845 //FIXME cleanup on error
848 static int load_map(struct peer_req *pr, struct map *map)
851 struct xseg_request *req;
852 struct peerd *peer = pr->peer;
853 map->flags |= MF_MAP_LOADING;
854 req = __load_map(pr, map);
857 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
858 map->flags &= ~MF_MAP_LOADING;
859 if (req->state & XS_FAILED){
860 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
861 xseg_put_request(peer->xseg, req, pr->portno);
864 r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
865 xseg_put_request(peer->xseg, req, pr->portno);
869 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
874 struct xseg_request *req;
875 struct peerd *peer = pr->peer;
876 struct mapperd *mapper = __get_mapperd(peer);
879 XSEGLOG2(&lc, I, "Opening map %s", m->volume);
881 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
883 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
888 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
890 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
895 char *reqtarget = xseg_get_target(peer->xseg, req);
898 strncpy(reqtarget, m->volume, req->targetlen);
900 req->size = block_size;
902 if (!(flags & MF_FORCE))
903 req->flags = XF_NOSYNC;
904 r = xseg_set_req_data(peer->xseg, req, pr);
906 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
910 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
912 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
916 r = xseg_signal(peer->xseg, p);
918 XSEGLOG2(&lc, I, "Map %s opening", m->volume);
922 xseg_get_req_data(peer->xseg, req, &dummy);
924 xseg_put_request(peer->xseg, req, pr->portno);
929 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
932 struct xseg_request *req;
933 struct peerd *peer = pr->peer;
935 map->flags |= MF_MAP_OPENING;
936 req = __open_map(pr, map, flags);
939 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
940 map->flags &= ~MF_MAP_OPENING;
941 err = req->state & XS_FAILED;
942 xseg_put_request(peer->xseg, req, pr->portno);
946 map->flags |= MF_MAP_EXCLUSIVE;
954 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
958 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
959 if (r == -XHASH_ERESIZE) {
960 xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
961 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
964 mio->copyups_nodes = new_hashmap;
965 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
969 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
970 if (r == -XHASH_ERESIZE) {
971 xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
972 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
975 mio->copyups_nodes = new_hashmap;
976 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
983 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
986 int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
992 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
994 struct mapperd *mapper = __get_mapperd(peer);
995 struct mapper_io *mio = __get_mapper_io(pr);
996 struct map *map = mn->map;
1001 uint32_t newtargetlen;
1002 char new_target[MAX_OBJECT_LEN + 1];
1003 unsigned char sha[SHA256_DIGEST_SIZE];
1005 strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1007 char tmp[XSEG_MAX_TARGETLEN + 1];
1009 strncpy(tmp, map->volume, map->volumelen);
1010 sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1011 tmp[XSEG_MAX_TARGETLEN] = 0;
1012 tmplen = strlen(tmp);
1013 SHA256((unsigned char *)tmp, tmplen, sha);
1014 hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1015 newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1018 if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1019 goto copyup_zeroblock;
1021 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1022 mapper->bportno, X_ALLOC);
1024 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1027 r = xseg_prep_request(peer->xseg, req, newtargetlen,
1028 sizeof(struct xseg_request_copy));
1030 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1034 char *target = xseg_get_target(peer->xseg, req);
1035 strncpy(target, new_target, req->targetlen);
1037 struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1038 strncpy(xcopy->target, mn->object, mn->objectlen);
1039 xcopy->targetlen = mn->objectlen;
1042 req->size = block_size;
1044 r = xseg_set_req_data(peer->xseg, req, pr);
1046 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1049 r = __set_copyup_node(mio, req, mn);
1050 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1052 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1055 xseg_signal(peer->xseg, p);
1058 mn->flags |= MF_OBJECT_COPYING;
1059 XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1063 r = __set_copyup_node(mio, req, NULL);
1064 xseg_get_req_data(peer->xseg, req, &dummy);
1066 xseg_put_request(peer->xseg, req, pr->portno);
1068 XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1072 XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1073 "Proceeding in writing the new object in map");
1074 /* construct a tmp map_node for writing purposes */
1075 struct map_node newmn = *mn;
1076 newmn.flags = MF_OBJECT_EXIST;
1077 strncpy(newmn.object, new_target, newtargetlen);
1078 newmn.object[newtargetlen] = 0;
1079 newmn.objectlen = newtargetlen;
1080 newmn.objectidx = mn->objectidx;
1081 req = object_write(peer, pr, map, &newmn);
1082 r = __set_copyup_node(mio, req, mn);
1084 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1085 "\n\t of map %s [%llu]",
1086 mn->object, map->volume, (unsigned long long) mn->objectidx);
1089 mn->flags |= MF_OBJECT_WRITING;
1090 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1094 static struct xseg_request * delete_object(struct peer_req *pr, struct map_node *mn)
1097 struct peerd *peer = pr->peer;
1098 struct mapperd *mapper = __get_mapperd(peer);
1099 struct mapper_io *mio = __get_mapper_io(pr);
1100 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1101 mapper->bportno, X_ALLOC);
1102 XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1104 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1107 int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1109 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1112 char *target = xseg_get_target(peer->xseg, req);
1113 strncpy(target, mn->object, req->targetlen);
1115 req->size = req->datalen;
1117 r = xseg_set_req_data(peer->xseg, req, pr);
1119 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1122 __set_copyup_node(mio, req, mn);
1123 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1125 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1128 r = xseg_signal(peer->xseg, p);
1129 XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1133 xseg_get_req_data(peer->xseg, req, &dummy);
1135 xseg_put_request(peer->xseg, req, pr->portno);
1137 XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1141 static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
1144 struct peerd *peer = pr->peer;
1145 struct mapperd *mapper = __get_mapperd(peer);
1146 struct mapper_io *mio = __get_mapper_io(pr);
1147 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1148 mapper->mbportno, X_ALLOC);
1149 XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1151 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1154 int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1156 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1159 char *target = xseg_get_target(peer->xseg, req);
1160 strncpy(target, map->volume, req->targetlen);
1162 req->size = req->datalen;
1164 r = xseg_set_req_data(peer->xseg, req, pr);
1166 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1169 __set_copyup_node(mio, req, NULL);
1170 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1172 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1175 r = xseg_signal(peer->xseg, p);
1176 map->flags |= MF_MAP_DELETING;
1177 XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1181 xseg_get_req_data(peer->xseg, req, &dummy);
1183 xseg_put_request(peer->xseg, req, pr->portno);
1185 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1190 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1192 struct map_node *mn = find_object(map, index);
1198 static inline void put_mapnode(struct map_node *mn)
1203 st_cond_destroy(mn->cond);
1207 static inline void __get_map(struct map *map)
1212 static inline void put_map(struct map *map)
1214 struct map_node *mn;
1217 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1220 for (i = 0; i < calc_map_obj(map); i++) {
1221 mn = get_mapnode(map, i);
1223 //make sure all pending operations on all objects are completed
1224 if (mn->flags & MF_OBJECT_NOT_READY){
1225 //this should never happen...
1226 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1228 mn->flags &= MF_OBJECT_DESTROYED;
1229 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1230 put_mapnode(mn); //matcing get_mapnode;
1231 //assert mn->ref == 0;
1234 mn = find_object(map, 0);
1237 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1242 static struct map * create_map(struct mapperd *mapper, char *name,
1243 uint32_t namelen, uint32_t flags)
1246 if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1247 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1248 namelen, MAX_VOLUME_LEN);
1251 struct map *m = malloc(sizeof(struct map));
1253 XSEGLOG2(&lc, E, "Cannot allocate map ");
1257 if (flags & MF_ARCHIP){
1258 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1259 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1260 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1261 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1262 m->version = 1; /* keep this hardcoded for now */
1265 strncpy(m->volume, name, namelen);
1266 m->volume[namelen] = 0;
1267 m->volumelen = namelen;
1268 m->version = 0; /* version 0 should be pithos maps */
1271 m->objects = xhash_new(3, INTEGER);
1273 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1279 m->cond = st_cond_new(); //FIXME err check;
1280 r = insert_map(mapper, m);
1282 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1289 xhash_free(m->objects);
1291 XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1299 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1301 struct peerd *peer = pr->peer;
1302 struct mapperd *mapper = __get_mapperd(peer);
1304 struct mapper_io *mio = __get_mapper_io(pr);
1305 struct map_node *mn = __get_copyup_node(mio, req);
1308 if (req->state & XS_FAILED){
1312 xseg_put_request(peer->xseg, req, pr->portno);
1316 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1318 struct peerd *peer = pr->peer;
1319 struct mapperd *mapper = __get_mapperd(peer);
1321 struct mapper_io *mio = __get_mapper_io(pr);
1322 struct map_node *mn = __get_copyup_node(mio, req);
1324 XSEGLOG2(&lc, E, "Cannot get map node");
1327 __set_copyup_node(mio, req, NULL);
1329 if (req->state & XS_FAILED){
1330 XSEGLOG2(&lc, E, "Req failed");
1331 mn->flags &= ~MF_OBJECT_COPYING;
1332 mn->flags &= ~MF_OBJECT_WRITING;
1335 if (req->op == X_WRITE) {
1336 char *target = xseg_get_target(peer->xseg, req);
1338 //printf("handle object write replyi\n");
1339 __set_copyup_node(mio, req, NULL);
1340 //assert mn->flags & MF_OBJECT_WRITING
1341 mn->flags &= ~MF_OBJECT_WRITING;
1343 struct map_node tmp;
1344 char *data = xseg_get_data(peer->xseg, req);
1345 map_to_object(&tmp, (unsigned char *) data);
1346 mn->flags |= MF_OBJECT_EXIST;
1347 if (mn->flags != MF_OBJECT_EXIST){
1348 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1351 //assert mn->flags & MF_OBJECT_EXIST
1352 strncpy(mn->object, tmp.object, tmp.objectlen);
1353 mn->object[tmp.objectlen] = 0;
1354 mn->objectlen = tmp.objectlen;
1355 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1358 } else if (req->op == X_COPY) {
1359 // issue write_object;
1360 mn->flags &= ~MF_OBJECT_COPYING;
1361 struct map *map = mn->map;
1363 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1367 /* construct a tmp map_node for writing purposes */
1368 char *target = xseg_get_target(peer->xseg, req);
1369 struct map_node newmn = *mn;
1370 newmn.flags = MF_OBJECT_EXIST;
1371 strncpy(newmn.object, target, req->targetlen);
1372 newmn.object[req->targetlen] = 0;
1373 newmn.objectlen = req->targetlen;
1374 newmn.objectidx = mn->objectidx;
1375 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1377 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1378 "\n\t of map %s [%llu]",
1379 mn->object, map->volume, (unsigned long long) mn->objectidx);
1382 mn->flags |= MF_OBJECT_WRITING;
1383 __set_copyup_node (mio, xreq, mn);
1385 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1392 xseg_put_request(peer->xseg, req, pr->portno);
1397 XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1406 struct map_node *mn;
1411 static int req2objs(struct peer_req *pr, struct map *map, int write)
1414 struct peerd *peer = pr->peer;
1415 struct mapper_io *mio = __get_mapper_io(pr);
1416 char *target = xseg_get_target(peer->xseg, pr->req);
1417 uint32_t nr_objs = calc_nr_obj(pr->req);
1418 uint64_t size = sizeof(struct xseg_reply_map) +
1419 nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1420 uint32_t idx, i, ready;
1421 uint64_t rem_size, obj_index, obj_offset, obj_size;
1422 struct map_node *mn;
1424 XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1426 /* get map_nodes of request */
1427 struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1429 XSEGLOG2(&lc, E, "Cannot allocate mns");
1433 rem_size = pr->req->size;
1434 obj_index = pr->req->offset / block_size;
1435 obj_offset = pr->req->offset & (block_size -1); //modulo
1436 obj_size = (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1437 mn = get_mapnode(map, obj_index);
1439 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1444 mns[idx].offset = obj_offset;
1445 mns[idx].size = obj_size;
1446 rem_size -= obj_size;
1447 while (rem_size > 0) {
1451 obj_size = (rem_size > block_size) ? block_size : rem_size;
1452 rem_size -= obj_size;
1453 mn = get_mapnode(map, obj_index);
1455 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1460 mns[idx].offset = obj_offset;
1461 mns[idx].size = obj_size;
1467 while (ready < (idx + 1)){
1469 for (i = 0; i < (idx+1); i++) {
1472 if (mn->flags & MF_OBJECT_NOT_READY) {
1474 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1475 if (mn->flags & MF_OBJECT_DELETED){
1479 XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1487 else if (!(mn->flags & MF_OBJECT_EXIST)) {
1488 //calc new_target, copy up object
1489 if (copyup_object(peer, mn, pr) == NULL){
1490 XSEGLOG2(&lc, E, "Error in copy up object");
1503 while(mio->copyups > 0){
1504 mio->cb = copyup_cb;
1507 st_cond_wait(pr->cond);
1514 XSEGLOG2(&lc, E, "Mio->err");
1518 /* resize request to fit reply */
1519 char buf[XSEG_MAX_TARGETLEN];
1520 strncpy(buf, target, pr->req->targetlen);
1521 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1523 XSEGLOG2(&lc, E, "Cannot resize request");
1526 target = xseg_get_target(peer->xseg, pr->req);
1527 strncpy(target, buf, pr->req->targetlen);
1529 /* structure reply */
1530 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1531 reply->cnt = nr_objs;
1532 for (i = 0; i < (idx+1); i++) {
1533 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1534 reply->segs[i].targetlen = mns[i].mn->objectlen;
1535 reply->segs[i].offset = mns[i].offset;
1536 reply->segs[i].size = mns[i].size;
1539 for (i = 0; i < idx; i++) {
1540 put_mapnode(mns[i].mn);
1546 static int do_dropcache(struct peer_req *pr, struct map *map)
1548 struct map_node *mn;
1549 struct peerd *peer = pr->peer;
1550 struct mapperd *mapper = __get_mapperd(peer);
1552 XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1553 map->flags |= MF_MAP_DROPPING_CACHE;
1554 for (i = 0; i < calc_map_obj(map); i++) {
1555 mn = get_mapnode(map, i);
1557 if (!(mn->flags & MF_OBJECT_DESTROYED)){
1558 //make sure all pending operations on all objects are completed
1559 if (mn->flags & MF_OBJECT_NOT_READY){
1560 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1562 mn->flags &= MF_OBJECT_DESTROYED;
1567 map->flags &= ~MF_MAP_DROPPING_CACHE;
1568 map->flags |= MF_MAP_DESTROYED;
1569 remove_map(mapper, map);
1570 XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1571 put_map(map); // put map here to destroy it (matches m->ref = 1 on map create)
1575 static int do_info(struct peer_req *pr, struct map *map)
1577 struct peerd *peer = pr->peer;
1578 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1579 xinfo->size = map->size;
1584 static int do_close(struct peer_req *pr, struct map *map)
1586 // struct peerd *peer = pr->peer;
1587 // struct xseg_request *req;
1588 if (map->flags & MF_MAP_EXCLUSIVE)
1590 return do_dropcache(pr, map);
1593 static int do_destroy(struct peer_req *pr, struct map *map)
1596 struct peerd *peer = pr->peer;
1597 struct mapper_io *mio = __get_mapper_io(pr);
1598 struct map_node *mn;
1599 struct xseg_request *req;
1601 XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1602 map->flags |= MF_MAP_DELETING;
1603 req = delete_map(pr, map);
1606 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1607 if (req->state & XS_FAILED){
1608 xseg_put_request(peer->xseg, req, pr->portno);
1609 map->flags &= ~MF_MAP_DELETING;
1612 xseg_put_request(peer->xseg, req, pr->portno);
1614 uint64_t nr_obj = calc_map_obj(map);
1615 uint64_t deleted = 0;
1616 while (deleted < nr_obj){
1618 for (i = 0; i < nr_obj; i++){
1619 mn = get_mapnode(map, i);
1621 if (!(mn->flags & MF_OBJECT_DESTROYED)){
1622 if (mn->flags & MF_OBJECT_EXIST){
1623 //make sure all pending operations on all objects are completed
1624 if (mn->flags & MF_OBJECT_NOT_READY){
1625 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1627 req = delete_object(pr, mn);
1629 if (mio->del_pending){
1638 mn->flags &= MF_OBJECT_DESTROYED;
1645 mio->cb = deletion_cb;
1646 wait_on_pr(pr, mio->del_pending > 0);
1649 map->flags &= ~MF_MAP_DELETING;
1650 XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1651 return do_close(pr, map);
1654 static int do_mapr(struct peer_req *pr, struct map *map)
1656 struct peerd *peer = pr->peer;
1657 int r = req2objs(pr, map, 0);
1659 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1661 (unsigned long long) pr->req->offset,
1662 (unsigned long long) (pr->req->offset + pr->req->size));
1665 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1667 (unsigned long long) pr->req->offset,
1668 (unsigned long long) (pr->req->offset + pr->req->size));
1669 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1670 (unsigned long long) pr->req->offset,
1671 (unsigned long long) pr->req->size);
1672 char buf[XSEG_MAX_TARGETLEN+1];
1673 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1675 for (i = 0; i < reply->cnt; i++) {
1676 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1677 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1678 buf[reply->segs[i].targetlen] = 0;
1679 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1680 (unsigned long long) reply->segs[i].offset,
1681 (unsigned long long) reply->segs[i].size);
1686 static int do_mapw(struct peer_req *pr, struct map *map)
1688 struct peerd *peer = pr->peer;
1689 int r = req2objs(pr, map, 1);
1691 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1693 (unsigned long long) pr->req->offset,
1694 (unsigned long long) (pr->req->offset + pr->req->size));
1697 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1699 (unsigned long long) pr->req->offset,
1700 (unsigned long long) (pr->req->offset + pr->req->size));
1701 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1702 (unsigned long long) pr->req->offset,
1703 (unsigned long long) pr->req->size);
1704 char buf[XSEG_MAX_TARGETLEN+1];
1705 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1707 for (i = 0; i < reply->cnt; i++) {
1708 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1709 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1710 buf[reply->segs[i].targetlen] = 0;
1711 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1712 (unsigned long long) reply->segs[i].offset,
1713 (unsigned long long) reply->segs[i].size);
1718 //here map is the parent map
1719 static int do_clone(struct peer_req *pr, struct map *map)
1722 FIXME check if clone map exists
1723 clonemap = get_map(pr, target, targetlen, MF_LOAD);
1725 do_dropcache(pr, clonemap); // drop map here, rely on get_map_function to drop
1726 // cache on non-exclusive opens or declare a NO_CACHE flag ?
1731 char buf[XSEG_MAX_TARGETLEN];
1732 struct peerd *peer = pr->peer;
1733 struct mapperd *mapper = __get_mapperd(peer);
1734 char *target = xseg_get_target(peer->xseg, pr->req);
1735 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1736 XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1737 struct map *clonemap = create_map(mapper, target, pr->req->targetlen,
1742 if (xclone->size == -1)
1743 clonemap->size = map->size;
1745 clonemap->size = xclone->size;
1746 if (clonemap->size < map->size){
1747 target = xseg_get_target(peer->xseg, pr->req);
1748 strncpy(buf, target, pr->req->targetlen);
1749 buf[pr->req->targetlen] = 0;
1750 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1751 "\n\t for requested clone %s",
1752 (unsigned long long) xclone->size,
1753 (unsigned long long) map->size, buf);
1757 //alloc and init map_nodes
1758 unsigned long c = clonemap->size/block_size + 1;
1759 struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1764 for (i = 0; i < clonemap->size/block_size + 1; i++) {
1765 struct map_node *mn = get_mapnode(map, i);
1767 strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1768 map_nodes[i].objectlen = mn->objectlen;
1771 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1772 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1774 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1775 map_nodes[i].flags = 0;
1776 map_nodes[i].objectidx = i;
1777 map_nodes[i].map = clonemap;
1778 map_nodes[i].ref = 1;
1779 map_nodes[i].waiters = 0;
1780 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1781 r = insert_object(clonemap, &map_nodes[i]);
1783 XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1787 r = write_map(pr, clonemap);
1789 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1799 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1802 if (flags & MF_EXCLUSIVE){
1803 r = open_map(pr, map, flags);
1805 if (flags & MF_FORCE){
1812 r = load_map(pr, map);
1813 if (r < 0 && opened){
1819 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
1823 struct peerd *peer = pr->peer;
1824 struct mapperd *mapper = __get_mapperd(peer);
1825 struct map *map = find_map_len(mapper, name, namelen, flags);
1827 if (flags & MF_LOAD){
1828 map = create_map(mapper, name, namelen, flags);
1831 r = open_load_map(pr, map, flags);
1833 do_dropcache(pr, map);
1839 } else if (map->flags & MF_MAP_DESTROYED){
1847 static int map_action(int (action)(struct peer_req *pr, struct map *map),
1848 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1850 //struct peerd *peer = pr->peer;
1853 map = get_map(pr, name, namelen, flags);
1856 if (map->flags & MF_MAP_NOT_READY){
1857 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1861 int r = action(pr, map);
1862 //always drop cache if map not read exclusively
1863 if (!(map->flags & MF_MAP_EXCLUSIVE))
1864 do_dropcache(pr, map);
1870 void * handle_info(struct peer_req *pr)
1872 struct peerd *peer = pr->peer;
1873 char *target = xseg_get_target(peer->xseg, pr->req);
1874 int r = map_action(do_info, pr, target, pr->req->targetlen,
1884 void * handle_clone(struct peer_req *pr)
1887 struct peerd *peer = pr->peer;
1888 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1893 if (xclone->targetlen){
1894 //support clone only from pithos
1895 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
1903 char *target = xseg_get_target(peer->xseg, pr->req);
1904 XSEGLOG2(&lc, I, "Creating volume");
1905 map = get_map(pr, target, pr->req->targetlen,
1908 XSEGLOG2(&lc, E, "Volume %s exists", map->volume);
1909 if (map->ref <= 2) //initial one + one ref from __get_map
1910 do_dropcache(pr, map); //we are the only ones usining this map. Drop the cache.
1911 put_map(map); //matches get_map
1915 //create a new empty map of size
1916 map = create_map(mapper, target, pr->req->targetlen,
1922 map->size = xclone->size;
1923 //populate_map with zero objects;
1924 uint64_t nr_objs = xclone->size / block_size;
1925 if (xclone->size % block_size)
1928 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1930 do_dropcache(pr, map); //Since we just created the map, dropping cache should be sufficient.
1935 for (i = 0; i < nr_objs; i++) {
1936 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1937 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1938 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1939 map_nodes[i].flags = 0;
1940 map_nodes[i].objectidx = i;
1941 map_nodes[i].map = map;
1942 map_nodes[i].ref = 1;
1943 map_nodes[i].waiters = 0;
1944 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1945 r = insert_object(map, &map_nodes[i]);
1947 do_dropcache(pr, map);
1952 r = write_map(pr, map);
1954 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1955 do_dropcache(pr, map);
1958 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1960 do_dropcache(pr, map); //drop cache here for consistency
1972 void * handle_mapr(struct peer_req *pr)
1974 struct peerd *peer = pr->peer;
1975 char *target = xseg_get_target(peer->xseg, pr->req);
1976 int r = map_action(do_mapr, pr, target, pr->req->targetlen,
1977 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
1986 void * handle_mapw(struct peer_req *pr)
1988 struct peerd *peer = pr->peer;
1989 char *target = xseg_get_target(peer->xseg, pr->req);
1990 int r = map_action(do_mapw, pr, target, pr->req->targetlen,
1991 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1996 XSEGLOG2(&lc, D, "Ta: %d", ta);
2001 void * handle_destroy(struct peer_req *pr)
2003 struct peerd *peer = pr->peer;
2004 char *target = xseg_get_target(peer->xseg, pr->req);
2005 int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2006 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2015 void * handle_close(struct peer_req *pr)
2017 struct peerd *peer = pr->peer;
2018 char *target = xseg_get_target(peer->xseg, pr->req);
2019 //here we do not want to load
2020 int r = map_action(do_close, pr, target, pr->req->targetlen,
2021 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2030 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
2031 struct xseg_request *req)
2033 //struct mapperd *mapper = __get_mapperd(peer);
2034 struct mapper_io *mio = __get_mapper_io(pr);
2035 void *(*action)(struct peer_req *) = NULL;
2037 mio->state = ACCEPTED;
2040 switch (pr->req->op) {
2041 /* primary xseg operations of mapper */
2042 case X_CLONE: action = handle_clone; break;
2043 case X_MAPR: action = handle_mapr; break;
2044 case X_MAPW: action = handle_mapw; break;
2045 // case X_SNAPSHOT: handle_snap(peer, pr, req); break;
2046 case X_INFO: action = handle_info; break;
2047 case X_DELETE: action = handle_destroy; break;
2048 case X_CLOSE: action = handle_close; break;
2049 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2054 st_thread_create(action, pr, 0, 0);
2060 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2061 enum dispatch_reason reason)
2063 struct mapperd *mapper = __get_mapperd(peer);
2065 struct mapper_io *mio = __get_mapper_io(pr);
2069 if (reason == dispatch_accept)
2070 dispatch_accepted(peer, pr, req);
2081 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2085 //FIXME error checks
2086 struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2087 peer->priv = mapperd;
2089 mapper->hashmaps = xhash_new(3, STRING);
2091 printf("%llu \n", MAX_VOLUME_SIZE);
2092 for (i = 0; i < peer->nr_ops; i++) {
2093 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2094 mio->copyups_nodes = xhash_new(3, INTEGER);
2098 peer->peer_reqs[i].priv = mio;
2101 for (i = 0; i < argc; i++) {
2102 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
2103 mapper->bportno = atoi(argv[i+1]);
2107 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
2108 mapper->mbportno = atoi(argv[i+1]);
2112 /* enforce only one thread */
2113 if (!strcmp(argv[i], "-t") && (i+1) < argc){
2114 int t = atoi(argv[i+1]);
2116 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
2124 const struct sched_param param = { .sched_priority = 99 };
2125 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
2133 void print_obj(struct map_node *mn)
2135 fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n",
2136 (unsigned long long) mn->objectidx, mn->object,
2137 (unsigned int) mn->objectlen,
2138 (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2141 void print_map(struct map *m)
2143 uint64_t nr_objs = m->size/block_size;
2144 if (m->size % block_size)
2146 fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n",
2147 m->volume, m->volumelen,
2148 (unsigned long long) m->size,
2149 (unsigned long long) nr_objs,
2152 struct map_node *mn;
2153 if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2155 for (i = 0; i < nr_objs; i++) {
2156 mn = find_object(m, i);
2158 printf("object idx [%llu] not found!\n", (unsigned long long) i);
2166 void test_map(struct peerd *peer)
2169 //struct sha256_ctx sha256ctx;
2170 unsigned char buf[SHA256_DIGEST_SIZE];
2171 char buf_new[XSEG_MAX_TARGETLEN + 20];
2172 struct map *m = malloc(sizeof(struct map));
2173 strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2174 m->volume[XSEG_MAX_TARGETLEN] = 0;
2175 strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2176 buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2177 m->volumelen = XSEG_MAX_TARGETLEN;
2178 m->size = 100*block_size;
2179 m->objects = xhash_new(3, INTEGER);
2180 struct map_node *map_node = calloc(100, sizeof(struct map_node));
2181 for (i = 0; i < 100; i++) {
2182 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2183 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2185 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2186 sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2188 map_node[i].objectidx = i;
2189 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2190 map_node[i].flags = MF_OBJECT_EXIST;
2191 ret = insert_object(m, &map_node[i]);
2194 char *data = malloc(block_size);
2195 mapheader_to_map(m, data);
2196 uint64_t pos = mapheader_size;
2198 for (i = 0; i < 100; i++) {
2199 map_node = find_object(m, i);
2201 printf("no object node %d \n", i);
2204 object_to_map(data+pos, map_node);
2205 pos += objectsize_in_map;
2209 struct map *m2 = malloc(sizeof(struct map));
2210 strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2211 m->volume[XSEG_MAX_TARGETLEN] = 0;
2212 m->volumelen = XSEG_MAX_TARGETLEN;
2214 m2->objects = xhash_new(3, INTEGER);
2215 ret = read_map(peer, m2, data);
2218 int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2220 while (sum < block_size) {
2221 r = write(fd, data + sum, block_size -sum);
2224 printf("write error\n");
2230 map_node = find_object(m, 0);