8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
15 #include <sys/syscall.h>
16 #include <openssl/sha.h>
18 /* general mapper flags */
19 #define MF_LOAD (1 << 0)
20 #define MF_EXCLUSIVE (1 << 1)
21 #define MF_FORCE (1 << 2)
22 #define MF_ARCHIP (1 << 3)
24 #ifndef SHA256_DIGEST_SIZE
25 #define SHA256_DIGEST_SIZE 32
27 /* hex representation of sha256 value takes up double the sha256 size */
28 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
30 #define block_size (1<<22) //FIXME this should be defined here?
32 /* transparency byte + max object len in disk */
33 #define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
35 /* Map header contains:
39 #define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
42 #define MAPPER_PREFIX "archip_"
43 #define MAPPER_PREFIX_LEN 7
45 #define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
46 #define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
48 #if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
49 #error "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
52 #define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
54 #if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
55 #error "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
58 #define MAX_VOLUME_SIZE \
59 ((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
62 char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
63 #define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
65 /* dispatch_internal mapper states */
74 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
77 /* mapper object flags */
78 #define MF_OBJECT_EXIST (1 << 0)
79 #define MF_OBJECT_COPYING (1 << 1)
80 #define MF_OBJECT_WRITING (1 << 2)
81 #define MF_OBJECT_DELETING (1 << 3)
82 #define MF_OBJECT_DESTROYED (1 << 5)
84 #define MF_OBJECT_NOT_READY (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
90 char object[MAX_OBJECT_LEN + 1]; /* NULL terminated string */
98 #define wait_on_pr(__pr, __condition__) \
99 while (__condition__){ \
101 __get_mapper_io(pr)->active = 0;\
102 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u", pr, ta); \
103 st_cond_wait(__pr->cond); \
106 #define wait_on_mapnode(__mn, __condition__) \
107 while (__condition__){ \
110 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
111 ta: %u", __mn, __mn->object, __mn->waiters, ta); \
112 st_cond_wait(__mn->cond); \
115 #define wait_on_map(__map, __condition__) \
116 while (__condition__){ \
119 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
120 __map, __map->volume, __map->waiters, ta); \
121 st_cond_wait(__map->cond); \
124 #define signal_pr(__pr) \
126 if (!__get_mapper_io(pr)->active){\
128 XSEGLOG2(&lc, D, "Signaling pr %lx, ta: %u", pr, ta);\
129 __get_mapper_io(pr)->active = 1;\
130 st_cond_signal(__pr->cond); \
134 #define signal_map(__map) \
136 if (__map->waiters) { \
138 XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
139 ta: %u", __map, __map->volume, __map->waiters, ta); \
141 st_cond_signal(__map->cond); \
145 #define signal_mapnode(__mn) \
147 if (__mn->waiters) { \
148 ta += __mn->waiters; \
149 XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
150 %u, ta: %u", __mn, __mn->object, __mn->waiters, ta); \
152 st_cond_broadcast(__mn->cond); \
158 #define MF_MAP_LOADING (1 << 0)
159 #define MF_MAP_DESTROYED (1 << 1)
160 #define MF_MAP_WRITING (1 << 2)
161 #define MF_MAP_DELETING (1 << 3)
162 #define MF_MAP_DROPPING_CACHE (1 << 4)
163 #define MF_MAP_EXCLUSIVE (1 << 5)
164 #define MF_MAP_OPENING (1 << 6)
165 #define MF_MAP_CLOSING (1 << 7)
166 #define MF_MAP_DELETED (1 << 8)
168 #define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
169 MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
176 char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
177 xhash_t *objects; /* obj_index --> map_node */
184 xport bportno; /* blocker that accesses data */
185 xport mbportno; /* blocker that accesses maps */
186 xhash_t *hashmaps; // hash_function(target) --> struct map
190 volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */
191 xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
192 struct map_node *copyup_node;
193 volatile int err; /* error flag */
194 volatile uint64_t del_pending;
198 enum mapper_state state;
203 struct mapperd *mapper;
205 void print_map(struct map *m);
208 void custom_peer_usage()
210 fprintf(stderr, "Custom peer options: \n"
211 "-bp : port for block blocker(!)\n"
212 "-mbp : port for map blocker\n"
221 static inline struct mapperd * __get_mapperd(struct peerd *peer)
223 return (struct mapperd *) peer->priv;
226 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
228 return (struct mapper_io *) pr->priv;
231 static inline uint64_t calc_map_obj(struct map *map)
235 uint64_t nr_objs = map->size / block_size;
236 if (map->size % block_size)
241 static uint32_t calc_nr_obj(struct xseg_request *req)
244 uint64_t rem_size = req->size;
245 uint64_t obj_offset = req->offset & (block_size -1); //modulo
246 uint64_t obj_size = (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
247 rem_size -= obj_size;
248 while (rem_size > 0) {
249 obj_size = (rem_size > block_size) ? block_size : rem_size;
250 rem_size -= obj_size;
258 * Unsafe. Doesn't check if data length is odd!
260 static void hexlify(unsigned char *data, char *hex)
263 for (i=0; i<SHA256_DIGEST_LENGTH; i++)
264 sprintf(hex+2*i, "%02x", data[i]);
267 static void unhexlify(char *hex, unsigned char *data)
271 for (i=0; i<SHA256_DIGEST_LENGTH; i++){
286 data[i] |= (c << 4) & 0xF0;
304 * Maps handling functions
307 static struct map * find_map(struct mapperd *mapper, char *volume)
309 struct map *m = NULL;
310 int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
317 static struct map * find_map_len(struct mapperd *mapper, char *target,
318 uint32_t targetlen, uint32_t flags)
320 char buf[XSEG_MAX_TARGETLEN+1];
321 if (flags & MF_ARCHIP){
322 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
323 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
324 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
325 targetlen += MAPPER_PREFIX_LEN;
328 strncpy(buf, target, targetlen);
332 if (targetlen > MAX_VOLUME_LEN){
333 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
334 targetlen, MAX_VOLUME_LEN);
338 XSEGLOG2(&lc, D, "looking up map %s, len %u",
340 return find_map(mapper, buf);
344 static int insert_map(struct mapperd *mapper, struct map *map)
348 if (find_map(mapper, map->volume)){
349 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
353 XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)",
354 map->volume, strlen(map->volume), (unsigned long) map);
355 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
356 while (r == -XHASH_ERESIZE) {
357 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
358 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
360 XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
361 (unsigned long long) shift);
364 mapper->hashmaps = new_hashmap;
365 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
371 static int remove_map(struct mapperd *mapper, struct map *map)
375 //assert no pending pr on map
377 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
378 while (r == -XHASH_ERESIZE) {
379 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
380 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
382 XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
383 (unsigned long long) shift);
386 mapper->hashmaps = new_hashmap;
387 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
393 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
397 struct peerd *peer = pr->peer;
398 struct xseg_request *req;
399 struct mapperd *mapper = __get_mapperd(peer);
402 XSEGLOG2(&lc, I, "Closing map %s", map->volume);
404 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
406 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
411 r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
413 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
418 char *reqtarget = xseg_get_target(peer->xseg, req);
421 strncpy(reqtarget, map->volume, req->targetlen);
425 r = xseg_set_req_data(peer->xseg, req, pr);
427 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
431 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
433 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
437 r = xseg_signal(peer->xseg, p);
438 map->flags |= MF_MAP_CLOSING;
440 XSEGLOG2(&lc, I, "Map %s closing", map->volume);
444 xseg_get_req_data(peer->xseg, req, &dummy);
446 xseg_put_request(peer->xseg, req, pr->portno);
451 static int close_map(struct peer_req *pr, struct map *map)
454 struct xseg_request *req;
455 struct peerd *peer = pr->peer;
457 req = __close_map(pr, map);
460 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
461 map->flags &= ~MF_MAP_CLOSING;
462 err = req->state & XS_FAILED;
463 xseg_put_request(peer->xseg, req, pr->portno);
470 static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
471 char *target, uint32_t targetlen, struct map **m)
473 struct mapperd *mapper = __get_mapperd(peer);
475 *m = find_map(mapper, target, targetlen);
477 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
478 if ((*m)->flags & MF_MAP_NOT_READY) {
479 __xq_append_tail(&(*m)->pending, (xqindex) pr);
480 XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
482 //} else if ((*m)->flags & MF_MAP_DESTROYED){
486 XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
490 r = open_map(peer, pr, target, targetlen, 0);
497 * Object handling functions
500 struct map_node *find_object(struct map *map, uint64_t obj_index)
503 int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
509 static int insert_object(struct map *map, struct map_node *mn)
511 //FIXME no find object first
512 int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
513 if (r == -XHASH_ERESIZE) {
514 unsigned long shift = xhash_grow_size_shift(map->objects);
515 map->objects = xhash_resize(map->objects, shift, NULL);
518 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
525 * map read/write functions
527 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
529 hexlify(buf, mn->object);
530 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
531 mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
532 mn->flags = MF_OBJECT_EXIST;
535 static inline void map_to_object(struct map_node *mn, unsigned char *buf)
540 mn->flags |= MF_OBJECT_EXIST;
541 strcpy(mn->object, MAPPER_PREFIX);
542 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
543 mn->object[MAX_OBJECT_LEN] = 0;
544 mn->objectlen = strlen(mn->object);
547 hexlify(buf+1, mn->object);
548 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
549 mn->objectlen = strlen(mn->object);
554 static inline void object_to_map(char* buf, struct map_node *mn)
556 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
558 /* strip common prefix */
559 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
562 unhexlify(mn->object, (unsigned char *)(buf+1));
566 static inline void mapheader_to_map(struct map *m, char *buf)
569 memcpy(buf + pos, &m->version, sizeof(m->version));
570 pos += sizeof(m->version);
571 memcpy(buf + pos, &m->size, sizeof(m->size));
572 pos += sizeof(m->size);
576 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
577 struct map *map, struct map_node *mn)
580 struct mapperd *mapper = __get_mapperd(peer);
581 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
582 mapper->mbportno, X_ALLOC);
584 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
586 mn->object, map->volume, (unsigned long long) mn->objectidx);
589 int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
591 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
593 mn->object, map->volume, (unsigned long long) mn->objectidx);
596 char *target = xseg_get_target(peer->xseg, req);
597 strncpy(target, map->volume, req->targetlen);
598 req->size = objectsize_in_map;
599 req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
601 char *data = xseg_get_data(peer->xseg, req);
602 object_to_map(data, mn);
604 r = xseg_set_req_data(peer->xseg, req, pr);
606 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
608 mn->object, map->volume, (unsigned long long) mn->objectidx);
611 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
613 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
615 mn->object, map->volume, (unsigned long long) mn->objectidx);
618 r = xseg_signal(peer->xseg, p);
620 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
622 XSEGLOG2(&lc, I, "Writing object %s \n\t"
624 mn->object, map->volume, (unsigned long long) mn->objectidx);
629 xseg_get_req_data(peer->xseg, req, &dummy);
631 xseg_put_request(peer->xseg, req, pr->portno);
633 XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
635 mn->object, map->volume, (unsigned long long) mn->objectidx);
639 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
642 struct peerd *peer = pr->peer;
643 struct mapperd *mapper = __get_mapperd(peer);
645 uint64_t i, pos, max_objidx = calc_map_obj(map);
646 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
647 mapper->mbportno, X_ALLOC);
649 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
652 int r = xseg_prep_request(peer->xseg, req, map->volumelen,
653 mapheader_size + max_objidx * objectsize_in_map);
655 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
658 char *target = xseg_get_target(peer->xseg, req);
659 strncpy(target, map->volume, req->targetlen);
660 char *data = xseg_get_data(peer->xseg, req);
661 mapheader_to_map(map, data);
662 pos = mapheader_size;
664 req->size = req->datalen;
667 if (map->size % block_size)
669 for (i = 0; i < max_objidx; i++) {
670 mn = find_object(map, i);
672 XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
673 (unsigned long long) i, map->volume);
676 object_to_map(data+pos, mn);
677 pos += objectsize_in_map;
679 r = xseg_set_req_data(peer->xseg, req, pr);
681 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
685 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
687 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
691 r = xseg_signal(peer->xseg, p);
693 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
695 map->flags |= MF_MAP_WRITING;
696 XSEGLOG2(&lc, I, "Writing map %s", map->volume);
700 xseg_get_req_data(peer->xseg, req, &dummy);
702 xseg_put_request(peer->xseg, req, pr->portno);
704 XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
708 static int write_map(struct peer_req* pr, struct map *map)
711 struct peerd *peer = pr->peer;
712 struct xseg_request *req = __write_map(pr, map);
715 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
716 if (req->state & XS_FAILED)
718 xseg_put_request(peer->xseg, req, pr->portno);
719 map->flags &= ~MF_MAP_WRITING;
723 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
727 struct xseg_request *req;
728 struct peerd *peer = pr->peer;
729 struct mapperd *mapper = __get_mapperd(peer);
732 XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
734 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
736 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
741 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
743 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
748 char *reqtarget = xseg_get_target(peer->xseg, req);
751 strncpy(reqtarget, m->volume, req->targetlen);
753 req->size = block_size;
755 r = xseg_set_req_data(peer->xseg, req, pr);
757 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
761 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
763 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
767 r = xseg_signal(peer->xseg, p);
769 m->flags |= MF_MAP_LOADING;
770 XSEGLOG2(&lc, I, "Map %s loading", m->volume);
774 xseg_get_req_data(peer->xseg, req, &dummy);
776 xseg_put_request(peer->xseg, req, pr->portno);
781 static int read_map (struct map *map, unsigned char *buf)
783 char nulls[SHA256_DIGEST_SIZE];
784 memset(nulls, 0, SHA256_DIGEST_SIZE);
786 int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
788 XSEGLOG2(&lc, D, "Read zeros");
792 //type 1, archip type, type 0 pithos map
793 int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
794 XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
797 struct map_node *map_node;
800 map->version = *(uint32_t *) (buf + pos);
801 pos += sizeof(uint32_t);
802 map->size = *(uint64_t *) (buf + pos);
803 pos += sizeof(uint64_t);
804 nr_objs = map->size / block_size;
805 if (map->size % block_size)
807 map_node = calloc(nr_objs, sizeof(struct map_node));
811 for (i = 0; i < nr_objs; i++) {
812 map_node[i].map = map;
813 map_node[i].objectidx = i;
814 map_node[i].waiters = 0;
816 map_node[i].cond = st_cond_new(); //FIXME err check;
817 map_to_object(&map_node[i], buf + pos);
818 pos += objectsize_in_map;
819 r = insert_object(map, &map_node[i]); //FIXME error check
823 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
824 map_node = calloc(max_nr_objs, sizeof(struct map_node));
827 for (i = 0; i < max_nr_objs; i++) {
828 if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
830 map_node[i].objectidx = i;
831 map_node[i].map = map;
832 map_node[i].waiters = 0;
834 map_node[i].cond = st_cond_new(); //FIXME err check;
835 pithosmap_to_object(&map_node[i], buf + pos);
836 pos += SHA256_DIGEST_SIZE;
837 r = insert_object(map, &map_node[i]); //FIXME error check
839 map->size = i * block_size;
842 XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
845 //FIXME cleanup on error
848 static int load_map(struct peer_req *pr, struct map *map)
851 struct xseg_request *req;
852 struct peerd *peer = pr->peer;
853 req = __load_map(pr, map);
856 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
857 map->flags &= ~MF_MAP_LOADING;
858 if (req->state & XS_FAILED){
859 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
860 xseg_put_request(peer->xseg, req, pr->portno);
863 r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
864 xseg_put_request(peer->xseg, req, pr->portno);
868 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
873 struct xseg_request *req;
874 struct peerd *peer = pr->peer;
875 struct mapperd *mapper = __get_mapperd(peer);
878 XSEGLOG2(&lc, I, "Opening map %s", m->volume);
880 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
882 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
887 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
889 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
894 char *reqtarget = xseg_get_target(peer->xseg, req);
897 strncpy(reqtarget, m->volume, req->targetlen);
899 req->size = block_size;
901 if (!(flags & MF_FORCE))
902 req->flags = XF_NOSYNC;
903 r = xseg_set_req_data(peer->xseg, req, pr);
905 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
909 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
911 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
915 r = xseg_signal(peer->xseg, p);
917 m->flags |= MF_MAP_OPENING;
918 XSEGLOG2(&lc, I, "Map %s opening", m->volume);
922 xseg_get_req_data(peer->xseg, req, &dummy);
924 xseg_put_request(peer->xseg, req, pr->portno);
929 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
932 struct xseg_request *req;
933 struct peerd *peer = pr->peer;
935 req = __open_map(pr, map, flags);
939 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
940 map->flags &= ~MF_MAP_OPENING;
941 err = req->state & XS_FAILED;
942 xseg_put_request(peer->xseg, req, pr->portno);
946 map->flags |= MF_MAP_EXCLUSIVE;
954 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
958 XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
960 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
961 if (r == -XHASH_ERESIZE) {
962 xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
963 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
966 mio->copyups_nodes = new_hashmap;
967 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
970 XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
974 XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
976 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
977 if (r == -XHASH_ERESIZE) {
978 xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
979 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
982 mio->copyups_nodes = new_hashmap;
983 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
986 XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
993 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
996 int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
998 XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
1001 XSEGLOG2(&lc, D, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
1005 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
1007 struct mapperd *mapper = __get_mapperd(peer);
1008 struct mapper_io *mio = __get_mapper_io(pr);
1009 struct map *map = mn->map;
1014 uint32_t newtargetlen;
1015 char new_target[MAX_OBJECT_LEN + 1];
1016 unsigned char sha[SHA256_DIGEST_SIZE];
1018 strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1020 char tmp[XSEG_MAX_TARGETLEN + 1];
1022 strncpy(tmp, map->volume, map->volumelen);
1023 sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1024 tmp[XSEG_MAX_TARGETLEN] = 0;
1025 tmplen = strlen(tmp);
1026 SHA256((unsigned char *)tmp, tmplen, sha);
1027 hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1028 newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1031 if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1032 goto copyup_zeroblock;
1034 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1035 mapper->bportno, X_ALLOC);
1037 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1040 r = xseg_prep_request(peer->xseg, req, newtargetlen,
1041 sizeof(struct xseg_request_copy));
1043 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1047 char *target = xseg_get_target(peer->xseg, req);
1048 strncpy(target, new_target, req->targetlen);
1050 struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1051 strncpy(xcopy->target, mn->object, mn->objectlen);
1052 xcopy->targetlen = mn->objectlen;
1055 req->size = block_size;
1057 r = xseg_set_req_data(peer->xseg, req, pr);
1059 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1062 r = __set_copyup_node(mio, req, mn);
1065 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1067 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1068 goto out_mapper_unset;
1070 xseg_signal(peer->xseg, p);
1073 mn->flags |= MF_OBJECT_COPYING;
1074 XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1078 __set_copyup_node(mio, req, NULL);
1080 xseg_get_req_data(peer->xseg, req, &dummy);
1082 xseg_put_request(peer->xseg, req, pr->portno);
1084 XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1088 XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1089 "Proceeding in writing the new object in map");
1090 /* construct a tmp map_node for writing purposes */
1091 struct map_node newmn = *mn;
1092 newmn.flags = MF_OBJECT_EXIST;
1093 strncpy(newmn.object, new_target, newtargetlen);
1094 newmn.object[newtargetlen] = 0;
1095 newmn.objectlen = newtargetlen;
1096 newmn.objectidx = mn->objectidx;
1097 req = object_write(peer, pr, map, &newmn);
1098 r = __set_copyup_node(mio, req, mn);
1102 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1103 "\n\t of map %s [%llu]",
1104 mn->object, map->volume, (unsigned long long) mn->objectidx);
1107 mn->flags |= MF_OBJECT_WRITING;
1108 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1112 static struct xseg_request * __delete_object(struct peer_req *pr, struct map_node *mn)
1115 struct peerd *peer = pr->peer;
1116 struct mapperd *mapper = __get_mapperd(peer);
1117 struct mapper_io *mio = __get_mapper_io(pr);
1118 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1119 mapper->bportno, X_ALLOC);
1120 XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1122 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1125 int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1127 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1130 char *target = xseg_get_target(peer->xseg, req);
1131 strncpy(target, mn->object, req->targetlen);
1133 req->size = req->datalen;
1135 r = xseg_set_req_data(peer->xseg, req, pr);
1137 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1140 r = __set_copyup_node(mio, req, mn);
1143 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1145 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1146 goto out_mapper_unset;
1148 r = xseg_signal(peer->xseg, p);
1149 mn->flags |= MF_OBJECT_DELETING;
1150 XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1154 __set_copyup_node(mio, req, NULL);
1156 xseg_get_req_data(peer->xseg, req, &dummy);
1158 xseg_put_request(peer->xseg, req, pr->portno);
1160 XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1164 static struct xseg_request * __delete_map(struct peer_req *pr, struct map *map)
1167 struct peerd *peer = pr->peer;
1168 struct mapperd *mapper = __get_mapperd(peer);
1169 struct mapper_io *mio = __get_mapper_io(pr);
1170 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1171 mapper->mbportno, X_ALLOC);
1172 XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1174 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1177 int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1179 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1182 char *target = xseg_get_target(peer->xseg, req);
1183 strncpy(target, map->volume, req->targetlen);
1185 req->size = req->datalen;
1187 r = xseg_set_req_data(peer->xseg, req, pr);
1189 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1192 /* do not check return value. just make sure there is no node set */
1193 __set_copyup_node(mio, req, NULL);
1194 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1196 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1199 r = xseg_signal(peer->xseg, p);
1200 map->flags |= MF_MAP_DELETING;
1201 XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1205 xseg_get_req_data(peer->xseg, req, &dummy);
1207 xseg_put_request(peer->xseg, req, pr->portno);
1209 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1214 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1216 struct map_node *mn = find_object(map, index);
1222 static inline void put_mapnode(struct map_node *mn)
1227 st_cond_destroy(mn->cond);
1231 static inline void __get_map(struct map *map)
1236 static inline void put_map(struct map *map)
1238 struct map_node *mn;
1241 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1244 for (i = 0; i < calc_map_obj(map); i++) {
1245 mn = get_mapnode(map, i);
1247 //make sure all pending operations on all objects are completed
1248 //this should never happen...
1249 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1250 mn->flags |= MF_OBJECT_DESTROYED;
1251 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1252 put_mapnode(mn); //matcing get_mapnode;
1253 //assert mn->ref == 0;
1256 mn = find_object(map, 0);
1259 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1264 static struct map * create_map(struct mapperd *mapper, char *name,
1265 uint32_t namelen, uint32_t flags)
1268 if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1269 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1270 namelen, MAX_VOLUME_LEN);
1273 struct map *m = malloc(sizeof(struct map));
1275 XSEGLOG2(&lc, E, "Cannot allocate map ");
1279 if (flags & MF_ARCHIP){
1280 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1281 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1282 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1283 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1284 m->version = 1; /* keep this hardcoded for now */
1287 strncpy(m->volume, name, namelen);
1288 m->volume[namelen] = 0;
1289 m->volumelen = namelen;
1290 m->version = 0; /* version 0 should be pithos maps */
1293 m->objects = xhash_new(3, INTEGER);
1295 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1301 m->cond = st_cond_new(); //FIXME err check;
1302 r = insert_map(mapper, m);
1304 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1311 xhash_free(m->objects);
1313 XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1321 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1323 struct peerd *peer = pr->peer;
1324 struct mapperd *mapper = __get_mapperd(peer);
1326 struct mapper_io *mio = __get_mapper_io(pr);
1327 struct map_node *mn = __get_copyup_node(mio, req);
1329 __set_copyup_node(mio, req, NULL);
1331 //assert req->op = X_DELETE;
1332 //assert pr->req->op = X_DELETE only map deletions make delete requests
1333 //assert mio->del_pending > 0
1334 XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
1337 if (req->state & XS_FAILED){
1341 XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
1342 mn, mn->object, mio, req);
1343 // assert mn->flags & MF_OBJECT_DELETING
1344 mn->flags &= ~MF_OBJECT_DELETING;
1345 mn->flags |= MF_OBJECT_DESTROYED;
1347 /* put mapnode here, matches get_mapnode on do_destroy */
1350 XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
1353 xseg_put_request(peer->xseg, req, pr->portno);
1357 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1359 struct peerd *peer = pr->peer;
1360 struct mapperd *mapper = __get_mapperd(peer);
1362 struct mapper_io *mio = __get_mapper_io(pr);
1363 struct map_node *mn = __get_copyup_node(mio, req);
1365 XSEGLOG2(&lc, E, "Cannot get map node");
1368 __set_copyup_node(mio, req, NULL);
1370 if (req->state & XS_FAILED){
1371 XSEGLOG2(&lc, E, "Req failed");
1372 mn->flags &= ~MF_OBJECT_COPYING;
1373 mn->flags &= ~MF_OBJECT_WRITING;
1376 if (req->op == X_WRITE) {
1377 char *target = xseg_get_target(peer->xseg, req);
1379 //printf("handle object write replyi\n");
1380 __set_copyup_node(mio, req, NULL);
1381 //assert mn->flags & MF_OBJECT_WRITING
1382 mn->flags &= ~MF_OBJECT_WRITING;
1384 struct map_node tmp;
1385 char *data = xseg_get_data(peer->xseg, req);
1386 map_to_object(&tmp, (unsigned char *) data);
1387 mn->flags |= MF_OBJECT_EXIST;
1388 if (mn->flags != MF_OBJECT_EXIST){
1389 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1392 //assert mn->flags & MF_OBJECT_EXIST
1393 strncpy(mn->object, tmp.object, tmp.objectlen);
1394 mn->object[tmp.objectlen] = 0;
1395 mn->objectlen = tmp.objectlen;
1396 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1400 } else if (req->op == X_COPY) {
1401 // issue write_object;
1402 mn->flags &= ~MF_OBJECT_COPYING;
1403 struct map *map = mn->map;
1405 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1409 /* construct a tmp map_node for writing purposes */
1410 char *target = xseg_get_target(peer->xseg, req);
1411 struct map_node newmn = *mn;
1412 newmn.flags = MF_OBJECT_EXIST;
1413 strncpy(newmn.object, target, req->targetlen);
1414 newmn.object[req->targetlen] = 0;
1415 newmn.objectlen = req->targetlen;
1416 newmn.objectidx = mn->objectidx;
1417 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1419 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1420 "\n\t of map %s [%llu]",
1421 mn->object, map->volume, (unsigned long long) mn->objectidx);
1424 mn->flags |= MF_OBJECT_WRITING;
1425 __set_copyup_node (mio, xreq, mn);
1427 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1434 xseg_put_request(peer->xseg, req, pr->portno);
1439 XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1449 struct map_node *mn;
1454 static int req2objs(struct peer_req *pr, struct map *map, int write)
1457 struct peerd *peer = pr->peer;
1458 struct mapper_io *mio = __get_mapper_io(pr);
1459 char *target = xseg_get_target(peer->xseg, pr->req);
1460 uint32_t nr_objs = calc_nr_obj(pr->req);
1461 uint64_t size = sizeof(struct xseg_reply_map) +
1462 nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1464 uint64_t rem_size, obj_index, obj_offset, obj_size;
1465 struct map_node *mn;
1467 XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1469 /* get map_nodes of request */
1470 struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1472 XSEGLOG2(&lc, E, "Cannot allocate mns");
1476 rem_size = pr->req->size;
1477 obj_index = pr->req->offset / block_size;
1478 obj_offset = pr->req->offset & (block_size -1); //modulo
1479 obj_size = (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1480 mn = get_mapnode(map, obj_index);
1482 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1487 mns[idx].offset = obj_offset;
1488 mns[idx].size = obj_size;
1489 rem_size -= obj_size;
1490 while (rem_size > 0) {
1494 obj_size = (rem_size > block_size) ? block_size : rem_size;
1495 rem_size -= obj_size;
1496 mn = get_mapnode(map, obj_index);
1498 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1503 mns[idx].offset = obj_offset;
1504 mns[idx].size = obj_size;
1509 /* do a first scan and issue as many copyups as we can.
1510 * then retry and wait when an object is not ready.
1511 * this could be done better, since now we wait also on the
1515 for (j = 0; j < 2 && !mio->err; j++) {
1516 for (i = 0; i < (idx+1); i++) {
1519 if (mn->flags & MF_OBJECT_NOT_READY){
1522 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1523 if (mn->flags & MF_OBJECT_DESTROYED){
1529 if (!(mn->flags & MF_OBJECT_EXIST)) {
1530 //calc new_target, copy up object
1531 if (copyup_object(peer, mn, pr) == NULL){
1532 XSEGLOG2(&lc, E, "Error in copy up object");
1540 XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1546 wait_on_pr(pr, mio->copyups > 0);
1551 XSEGLOG2(&lc, E, "Mio->err");
1555 /* resize request to fit reply */
1556 char buf[XSEG_MAX_TARGETLEN];
1557 strncpy(buf, target, pr->req->targetlen);
1558 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1560 XSEGLOG2(&lc, E, "Cannot resize request");
1563 target = xseg_get_target(peer->xseg, pr->req);
1564 strncpy(target, buf, pr->req->targetlen);
1566 /* structure reply */
1567 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1568 reply->cnt = nr_objs;
1569 for (i = 0; i < (idx+1); i++) {
1570 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1571 reply->segs[i].targetlen = mns[i].mn->objectlen;
1572 reply->segs[i].offset = mns[i].offset;
1573 reply->segs[i].size = mns[i].size;
1576 for (i = 0; i < idx; i++) {
1577 put_mapnode(mns[i].mn);
1584 static int do_dropcache(struct peer_req *pr, struct map *map)
1586 struct map_node *mn;
1587 struct peerd *peer = pr->peer;
1588 struct mapperd *mapper = __get_mapperd(peer);
1590 XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1591 map->flags |= MF_MAP_DROPPING_CACHE;
1592 for (i = 0; i < calc_map_obj(map); i++) {
1593 mn = get_mapnode(map, i);
1595 if (!(mn->flags & MF_OBJECT_DESTROYED)){
1596 //make sure all pending operations on all objects are completed
1597 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1598 mn->flags |= MF_OBJECT_DESTROYED;
1603 map->flags &= ~MF_MAP_DROPPING_CACHE;
1604 map->flags |= MF_MAP_DESTROYED;
1605 remove_map(mapper, map);
1606 XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1607 put_map(map); // put map here to destroy it (matches m->ref = 1 on map create)
1611 static int do_info(struct peer_req *pr, struct map *map)
1613 struct peerd *peer = pr->peer;
1614 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1615 xinfo->size = map->size;
1620 static int do_close(struct peer_req *pr, struct map *map)
1622 if (map->flags & MF_MAP_EXCLUSIVE){
1623 /* do not drop cache if close failed and map not deleted */
1624 if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
1627 return do_dropcache(pr, map);
1630 static int do_destroy(struct peer_req *pr, struct map *map)
1633 struct peerd *peer = pr->peer;
1634 struct mapper_io *mio = __get_mapper_io(pr);
1635 struct map_node *mn;
1636 struct xseg_request *req;
1638 if (!(map->flags & MF_MAP_EXCLUSIVE))
1641 XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1642 req = __delete_map(pr, map);
1645 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1646 if (req->state & XS_FAILED){
1647 xseg_put_request(peer->xseg, req, pr->portno);
1648 map->flags &= ~MF_MAP_DELETING;
1651 xseg_put_request(peer->xseg, req, pr->portno);
1653 uint64_t nr_obj = calc_map_obj(map);
1654 mio->cb = deletion_cb;
1655 mio->del_pending = 0;
1657 for (i = 0; i < nr_obj; i++){
1659 /* throttle pending deletions
1660 * this should be nr_ops of the blocker, but since we don't know
1661 * that, we assume based on our own nr_ops
1663 wait_on_pr(pr, mio->del_pending >= peer->nr_ops);
1665 mn = get_mapnode(map, i);
1668 if (mn->flags & MF_OBJECT_DESTROYED){
1672 if (!(mn->flags & MF_OBJECT_EXIST)){
1673 mn->flags |= MF_OBJECT_DESTROYED;
1678 // make sure all pending operations on all objects are completed
1679 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1681 req = __delete_object(pr, mn);
1688 /* do not put_mapnode here. cb does that */
1691 wait_on_pr(pr, mio->del_pending > 0);
1694 map->flags &= ~MF_MAP_DELETING;
1695 map->flags |= MF_MAP_DELETED;
1696 XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1697 return do_close(pr, map);
1700 static int do_mapr(struct peer_req *pr, struct map *map)
1702 struct peerd *peer = pr->peer;
1703 int r = req2objs(pr, map, 0);
1705 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1707 (unsigned long long) pr->req->offset,
1708 (unsigned long long) (pr->req->offset + pr->req->size));
1711 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1713 (unsigned long long) pr->req->offset,
1714 (unsigned long long) (pr->req->offset + pr->req->size));
1715 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1716 (unsigned long long) pr->req->offset,
1717 (unsigned long long) pr->req->size);
1718 char buf[XSEG_MAX_TARGETLEN+1];
1719 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1721 for (i = 0; i < reply->cnt; i++) {
1722 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1723 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1724 buf[reply->segs[i].targetlen] = 0;
1725 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1726 (unsigned long long) reply->segs[i].offset,
1727 (unsigned long long) reply->segs[i].size);
1732 static int do_mapw(struct peer_req *pr, struct map *map)
1734 struct peerd *peer = pr->peer;
1735 int r = req2objs(pr, map, 1);
1737 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1739 (unsigned long long) pr->req->offset,
1740 (unsigned long long) (pr->req->offset + pr->req->size));
1743 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1745 (unsigned long long) pr->req->offset,
1746 (unsigned long long) (pr->req->offset + pr->req->size));
1747 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1748 (unsigned long long) pr->req->offset,
1749 (unsigned long long) pr->req->size);
1750 char buf[XSEG_MAX_TARGETLEN+1];
1751 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1753 for (i = 0; i < reply->cnt; i++) {
1754 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1755 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1756 buf[reply->segs[i].targetlen] = 0;
1757 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1758 (unsigned long long) reply->segs[i].offset,
1759 (unsigned long long) reply->segs[i].size);
1764 //here map is the parent map
1765 static int do_clone(struct peer_req *pr, struct map *map)
1768 char buf[XSEG_MAX_TARGETLEN];
1769 struct peerd *peer = pr->peer;
1770 struct mapperd *mapper = __get_mapperd(peer);
1771 char *target = xseg_get_target(peer->xseg, pr->req);
1772 struct map *clonemap;
1773 struct xseg_request_clone *xclone =
1774 (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1776 XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1778 clonemap = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
1782 /* open map to get exclusive access to map */
1783 r = open_map(pr, clonemap, 0);
1785 XSEGLOG2(&lc, E, "Cannot open map %s", clonemap->volume);
1786 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
1789 r = load_map(pr, clonemap);
1791 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
1795 if (xclone->size == -1)
1796 clonemap->size = map->size;
1798 clonemap->size = xclone->size;
1799 if (clonemap->size < map->size){
1800 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1801 "\n\t for requested clone %s",
1802 (unsigned long long) xclone->size,
1803 (unsigned long long) map->size, clonemap->volume);
1806 if (clonemap->size > MAX_VOLUME_SIZE) {
1807 XSEGLOG2(&lc, E, "Requested size %llu > max volume size %llu"
1808 "\n\t for volume %s",
1809 clonemap->size, MAX_VOLUME_SIZE, clonemap->volume);
1813 //alloc and init map_nodes
1814 unsigned long c = clonemap->size/block_size + 1;
1815 struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1820 for (i = 0; i < clonemap->size/block_size + 1; i++) {
1821 struct map_node *mn = get_mapnode(map, i);
1823 strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1824 map_nodes[i].objectlen = mn->objectlen;
1827 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1828 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1830 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1831 map_nodes[i].flags = 0;
1832 map_nodes[i].objectidx = i;
1833 map_nodes[i].map = clonemap;
1834 map_nodes[i].ref = 1;
1835 map_nodes[i].waiters = 0;
1836 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1837 r = insert_object(clonemap, &map_nodes[i]);
1839 XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1844 r = write_map(pr, clonemap);
1846 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1849 do_close(pr, clonemap);
1853 do_close(pr, clonemap);
1857 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1860 if (flags & MF_EXCLUSIVE){
1861 r = open_map(pr, map, flags);
1863 if (flags & MF_FORCE){
1870 r = load_map(pr, map);
1871 if (r < 0 && opened){
1877 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
1881 struct peerd *peer = pr->peer;
1882 struct mapperd *mapper = __get_mapperd(peer);
1883 struct map *map = find_map_len(mapper, name, namelen, flags);
1885 if (flags & MF_LOAD){
1886 map = create_map(mapper, name, namelen, flags);
1889 r = open_load_map(pr, map, flags);
1891 do_dropcache(pr, map);
1897 } else if (map->flags & MF_MAP_DESTROYED){
1905 static int map_action(int (action)(struct peer_req *pr, struct map *map),
1906 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1908 //struct peerd *peer = pr->peer;
1911 map = get_map(pr, name, namelen, flags);
1914 if (map->flags & MF_MAP_NOT_READY){
1915 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1919 int r = action(pr, map);
1920 //always drop cache if map not read exclusively
1921 if (!(map->flags & MF_MAP_EXCLUSIVE))
1922 do_dropcache(pr, map);
1928 void * handle_info(struct peer_req *pr)
1930 struct peerd *peer = pr->peer;
1931 char *target = xseg_get_target(peer->xseg, pr->req);
1932 int r = map_action(do_info, pr, target, pr->req->targetlen,
1942 void * handle_clone(struct peer_req *pr)
1945 struct peerd *peer = pr->peer;
1946 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1952 if (xclone->targetlen){
1953 /* if snap was defined */
1954 //support clone only from pithos
1955 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
1958 /* else try to create a new volume */
1959 XSEGLOG2(&lc, I, "Creating volume");
1961 XSEGLOG2(&lc, E, "Cannot create volume. Size not specified");
1965 if (xclone->size > MAX_VOLUME_SIZE) {
1966 XSEGLOG2(&lc, E, "Requested size %llu > max volume "
1967 "size %llu", xclone->size, MAX_VOLUME_SIZE);
1973 char *target = xseg_get_target(peer->xseg, pr->req);
1975 //create a new empty map of size
1976 map = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
1981 /* open map to get exclusive access to map */
1982 r = open_map(pr, map, 0);
1984 XSEGLOG2(&lc, E, "Cannot open map %s", map->volume);
1985 XSEGLOG2(&lc, E, "Target volume %s exists", map->volume);
1986 do_dropcache(pr, map);
1990 r = load_map(pr, map);
1992 XSEGLOG2(&lc, E, "Map exists %s", map->volume);
1997 map->size = xclone->size;
1998 //populate_map with zero objects;
1999 uint64_t nr_objs = xclone->size / block_size;
2000 if (xclone->size % block_size)
2003 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
2011 for (i = 0; i < nr_objs; i++) {
2012 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2013 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2014 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2015 map_nodes[i].flags = 0;
2016 map_nodes[i].objectidx = i;
2017 map_nodes[i].map = map;
2018 map_nodes[i].ref = 1;
2019 map_nodes[i].waiters = 0;
2020 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2021 r = insert_object(map, &map_nodes[i]);
2028 r = write_map(pr, map);
2030 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
2034 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
2036 do_close(pr, map); //drop cache here for consistency
2047 void * handle_mapr(struct peer_req *pr)
2049 struct peerd *peer = pr->peer;
2050 char *target = xseg_get_target(peer->xseg, pr->req);
2051 int r = map_action(do_mapr, pr, target, pr->req->targetlen,
2052 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2061 void * handle_mapw(struct peer_req *pr)
2063 struct peerd *peer = pr->peer;
2064 char *target = xseg_get_target(peer->xseg, pr->req);
2065 int r = map_action(do_mapw, pr, target, pr->req->targetlen,
2066 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2071 XSEGLOG2(&lc, D, "Ta: %d", ta);
2076 void * handle_destroy(struct peer_req *pr)
2078 struct peerd *peer = pr->peer;
2079 char *target = xseg_get_target(peer->xseg, pr->req);
2080 /* request EXCLUSIVE access, but do not force it.
2081 * check if succeeded on do_destroy
2083 int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2084 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2093 void * handle_close(struct peer_req *pr)
2095 struct peerd *peer = pr->peer;
2096 char *target = xseg_get_target(peer->xseg, pr->req);
2097 //here we do not want to load
2098 int r = map_action(do_close, pr, target, pr->req->targetlen,
2099 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2108 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
2109 struct xseg_request *req)
2111 //struct mapperd *mapper = __get_mapperd(peer);
2112 struct mapper_io *mio = __get_mapper_io(pr);
2113 void *(*action)(struct peer_req *) = NULL;
2115 mio->state = ACCEPTED;
2118 switch (pr->req->op) {
2119 /* primary xseg operations of mapper */
2120 case X_CLONE: action = handle_clone; break;
2121 case X_MAPR: action = handle_mapr; break;
2122 case X_MAPW: action = handle_mapw; break;
2123 // case X_SNAPSHOT: handle_snap(peer, pr, req); break;
2124 case X_INFO: action = handle_info; break;
2125 case X_DELETE: action = handle_destroy; break;
2126 case X_CLOSE: action = handle_close; break;
2127 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2132 st_thread_create(action, pr, 0, 0);
2138 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2139 enum dispatch_reason reason)
2141 struct mapperd *mapper = __get_mapperd(peer);
2143 struct mapper_io *mio = __get_mapper_io(pr);
2147 if (reason == dispatch_accept)
2148 dispatch_accepted(peer, pr, req);
2159 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2163 //FIXME error checks
2164 struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2165 peer->priv = mapperd;
2167 mapper->hashmaps = xhash_new(3, STRING);
2169 for (i = 0; i < peer->nr_ops; i++) {
2170 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2171 mio->copyups_nodes = xhash_new(3, INTEGER);
2175 peer->peer_reqs[i].priv = mio;
2178 for (i = 0; i < argc; i++) {
2179 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
2180 mapper->bportno = atoi(argv[i+1]);
2184 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
2185 mapper->mbportno = atoi(argv[i+1]);
2189 /* enforce only one thread */
2190 if (!strcmp(argv[i], "-t") && (i+1) < argc){
2191 int t = atoi(argv[i+1]);
2193 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
2201 const struct sched_param param = { .sched_priority = 99 };
2202 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
2203 /* FIXME maybe place it in peer
2204 * should be done for each port (sportno to eportno)
2206 xseg_set_max_requests(peer->xseg, peer->portno_start, 5000);
2207 xseg_set_freequeue_size(peer->xseg, peer->portno_start, 3000, 0);
2215 /* FIXME this should not be here */
2216 int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
2218 struct xseg *xseg = peer->xseg;
2219 xport portno_start = peer->portno_start;
2220 xport portno_end = peer->portno_end;
2221 struct peer_req *pr;
2224 struct xseg_request *req, *received;
2225 xseg_prepare_wait(xseg, portno_start);
2227 XSEGLOG2(&lc, D, "Attempting to check for reply");
2231 for (i = portno_start; i <= portno_end; i++) {
2232 received = xseg_receive(xseg, i, 0);
2235 r = xseg_get_req_data(xseg, received, (void **) &pr);
2236 if (r < 0 || !pr || received != expected_req){
2237 XSEGLOG2(&lc, W, "Received request with no pr data\n");
2238 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
2240 XSEGLOG2(&lc, W, "Could not respond stale request");
2241 xseg_put_request(xseg, received, portno_start);
2244 xseg_signal(xseg, p);
2247 xseg_cancel_wait(xseg, portno_start);
2253 xseg_wait_signal(xseg, 1000000UL);
2258 void custom_peer_finalize(struct peerd *peer)
2260 struct mapperd *mapper = __get_mapperd(peer);
2261 struct peer_req *pr = alloc_peer_req(peer);
2263 XSEGLOG2(&lc, E, "Cannot get peer request");
2268 struct xseg_request *req;
2271 xhash_iter_init(mapper->hashmaps, &it);
2272 while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
2273 map = (struct map *)val;
2274 if (!(map->flags & MF_MAP_EXCLUSIVE))
2276 req = __close_map(pr, map);
2279 wait_reply(peer, req);
2280 if (!(req->state & XS_SERVED))
2281 XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
2282 map->flags &= ~MF_MAP_CLOSING;
2283 xseg_put_request(peer->xseg, req, pr->portno);
2290 void print_obj(struct map_node *mn)
2292 fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n",
2293 (unsigned long long) mn->objectidx, mn->object,
2294 (unsigned int) mn->objectlen,
2295 (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2298 void print_map(struct map *m)
2300 uint64_t nr_objs = m->size/block_size;
2301 if (m->size % block_size)
2303 fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n",
2304 m->volume, m->volumelen,
2305 (unsigned long long) m->size,
2306 (unsigned long long) nr_objs,
2309 struct map_node *mn;
2310 if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2312 for (i = 0; i < nr_objs; i++) {
2313 mn = find_object(m, i);
2315 printf("object idx [%llu] not found!\n", (unsigned long long) i);
2323 void test_map(struct peerd *peer)
2326 //struct sha256_ctx sha256ctx;
2327 unsigned char buf[SHA256_DIGEST_SIZE];
2328 char buf_new[XSEG_MAX_TARGETLEN + 20];
2329 struct map *m = malloc(sizeof(struct map));
2330 strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2331 m->volume[XSEG_MAX_TARGETLEN] = 0;
2332 strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2333 buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2334 m->volumelen = XSEG_MAX_TARGETLEN;
2335 m->size = 100*block_size;
2336 m->objects = xhash_new(3, INTEGER);
2337 struct map_node *map_node = calloc(100, sizeof(struct map_node));
2338 for (i = 0; i < 100; i++) {
2339 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2340 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2342 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2343 sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2345 map_node[i].objectidx = i;
2346 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2347 map_node[i].flags = MF_OBJECT_EXIST;
2348 ret = insert_object(m, &map_node[i]);
2351 char *data = malloc(block_size);
2352 mapheader_to_map(m, data);
2353 uint64_t pos = mapheader_size;
2355 for (i = 0; i < 100; i++) {
2356 map_node = find_object(m, i);
2358 printf("no object node %d \n", i);
2361 object_to_map(data+pos, map_node);
2362 pos += objectsize_in_map;
2366 struct map *m2 = malloc(sizeof(struct map));
2367 strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2368 m->volume[XSEG_MAX_TARGETLEN] = 0;
2369 m->volumelen = XSEG_MAX_TARGETLEN;
2371 m2->objects = xhash_new(3, INTEGER);
2372 ret = read_map(peer, m2, data);
2375 int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2377 while (sum < block_size) {
2378 r = write(fd, data + sum, block_size -sum);
2381 printf("write error\n");
2387 map_node = find_object(m, 0);