8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
16 #include <sys/syscall.h>
18 GCRY_THREAD_OPTION_PTHREAD_IMPL;
19 #define MF_LOAD (1 << 0)
20 #define MF_EXCLUSIVE (1 << 1)
21 #define MF_FORCE (1 << 2)
23 #define SHA256_DIGEST_SIZE 32
24 /* hex representation of sha256 value takes up double the sha256 size */
25 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
27 #define block_size (1<<22) //FIXME this should be defined here?
28 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
29 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value + volume size */
31 #define MF_OBJECT_EXIST (1 << 0)
32 #define MF_OBJECT_COPYING (1 << 1)
33 #define MF_OBJECT_WRITING (1 << 2)
34 #define MF_OBJECT_DELETING (1 << 3)
35 #define MF_OBJECT_DELETED (1 << 4)
36 #define MF_OBJECT_DESTROYED (1 << 5)
38 #define MF_OBJECT_NOT_READY (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
40 char *magic_string = "This a magic string. Please hash me";
41 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
42 char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
44 //dispatch_internal mapper states
53 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
59 char object[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
66 #define MF_MAP_LOADING (1 << 0)
67 #define MF_MAP_DESTROYED (1 << 1)
68 #define MF_MAP_WRITING (1 << 2)
69 #define MF_MAP_DELETING (1 << 3)
70 #define MF_MAP_DROPPING_CACHE (1 << 4)
71 #define MF_MAP_EXCLUSIVE (1 << 5)
72 #define MF_MAP_OPENING (1 << 6)
73 #define MF_MAP_CLOSING (1 << 7)
75 #define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
76 MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
78 #define wait_on_pr(__pr, __condition__) \
79 while (__condition__){ \
81 __get_mapper_io(pr)->active = 0;\
82 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u", pr, ta); \
83 st_cond_wait(__pr->cond); \
86 #define wait_on_mapnode(__mn, __condition__) \
87 while (__condition__){ \
90 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, ta: %u", __mn, __mn->object, __mn->waiters, ta); \
91 st_cond_wait(__mn->cond); \
94 #define wait_on_map(__map, __condition__) \
95 while (__condition__){ \
98 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u", __map, __map->volume, __map->waiters, ta); \
99 st_cond_wait(__map->cond); \
102 #define signal_pr(__pr) \
104 if (!__get_mapper_io(pr)->active){\
106 XSEGLOG2(&lc, D, "Signaling pr %lx, ta: %u", pr, ta); \
107 __get_mapper_io(pr)->active = 1;\
108 st_cond_signal(__pr->cond); \
112 #define signal_map(__map) \
114 if (__map->waiters) { \
116 XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, ta: %u", __map, __map->volume, __map->waiters, ta); \
118 st_cond_signal(__map->cond); \
122 #define signal_mapnode(__mn) \
124 if (__mn->waiters) { \
125 ta += __mn->waiters; \
126 XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: %u, ta: %u", __mn, __mn->object, __mn->waiters, ta); \
128 st_cond_broadcast(__mn->cond); \
137 char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
138 xhash_t *objects; /* obj_index --> map_node */
145 xport bportno; /* blocker that accesses data */
146 xport mbportno; /* blocker that accesses maps */
147 xhash_t *hashmaps; // hash_function(target) --> struct map
151 volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */
152 xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
153 struct map_node *copyup_node;
154 volatile int err; /* error flag */
155 volatile uint64_t del_pending;
159 enum mapper_state state;
164 struct mapperd *mapper;
166 void print_map(struct map *m);
172 static inline struct mapperd * __get_mapperd(struct peerd *peer)
174 return (struct mapperd *) peer->priv;
177 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
179 return (struct mapper_io *) pr->priv;
182 static inline uint64_t calc_map_obj(struct map *map)
186 uint64_t nr_objs = map->size / block_size;
187 if (map->size % block_size)
192 static uint32_t calc_nr_obj(struct xseg_request *req)
195 uint64_t rem_size = req->size;
196 uint64_t obj_offset = req->offset & (block_size -1); //modulo
197 uint64_t obj_size = (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
198 rem_size -= obj_size;
199 while (rem_size > 0) {
200 obj_size = (rem_size > block_size) ? block_size : rem_size;
201 rem_size -= obj_size;
209 * Maps handling functions
211 //FIXME assert targetlen > 0
214 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
217 struct map *m = NULL;
218 char buf[XSEG_MAX_TARGETLEN+1];
219 //assert targetlen <= XSEG_MAX_TARGETLEN
220 strncpy(buf, target, targetlen);
222 XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
223 r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
230 static int insert_map(struct mapperd *mapper, struct map *map)
234 if (find_map(mapper, map->volume, map->volumelen)){
235 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
239 XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)",
240 map->volume, strlen(map->volume), (unsigned long) map);
241 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
242 while (r == -XHASH_ERESIZE) {
243 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
244 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
246 XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
247 (unsigned long long) shift);
250 mapper->hashmaps = new_hashmap;
251 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
257 static int remove_map(struct mapperd *mapper, struct map *map)
261 //assert no pending pr on map
263 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
264 while (r == -XHASH_ERESIZE) {
265 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
266 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
268 XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
269 (unsigned long long) shift);
272 mapper->hashmaps = new_hashmap;
273 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
279 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
283 struct peerd *peer = pr->peer;
284 struct xseg_request *req;
285 struct mapperd *mapper = __get_mapperd(peer);
288 XSEGLOG2(&lc, I, "Closing map %s", map->volume);
290 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
292 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
297 r = xseg_prep_request(peer->xseg, req, map->volumelen, block_size);
299 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
304 char *reqtarget = xseg_get_target(peer->xseg, req);
307 strncpy(reqtarget, map->volume, req->targetlen);
309 req->size = block_size;
311 r = xseg_set_req_data(peer->xseg, req, pr);
313 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
317 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
319 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
323 r = xseg_signal(peer->xseg, p);
325 XSEGLOG2(&lc, I, "Map %s closing", map->volume);
329 xseg_get_req_data(peer->xseg, req, &dummy);
331 xseg_put_request(peer->xseg, req, pr->portno);
336 static int close_map(struct peer_req *pr, struct map *map)
339 struct xseg_request *req;
340 struct peerd *peer = pr->peer;
342 map->flags |= MF_MAP_CLOSING;
343 req = __close_map(pr, map);
346 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
347 map->flags &= ~MF_MAP_CLOSING;
348 err = req->state & XS_FAILED;
349 xseg_put_request(peer->xseg, req, pr->portno);
356 static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
357 char *target, uint32_t targetlen, struct map **m)
359 struct mapperd *mapper = __get_mapperd(peer);
361 *m = find_map(mapper, target, targetlen);
363 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
364 if ((*m)->flags & MF_MAP_NOT_READY) {
365 __xq_append_tail(&(*m)->pending, (xqindex) pr);
366 XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
368 //} else if ((*m)->flags & MF_MAP_DESTROYED){
372 XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
376 r = open_map(peer, pr, target, targetlen, 0);
383 * Object handling functions
386 struct map_node *find_object(struct map *map, uint64_t obj_index)
389 int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
395 static int insert_object(struct map *map, struct map_node *mn)
397 //FIXME no find object first
398 int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
399 if (r == -XHASH_ERESIZE) {
400 unsigned long shift = xhash_grow_size_shift(map->objects);
401 map->objects = xhash_resize(map->objects, shift, NULL);
404 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
411 * map read/write functions
413 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
416 //hexlify sha256 value
417 for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
418 sprintf(mn->object+2*i, "%02x", buf[i]);
421 mn->object[SHA256_DIGEST_SIZE * 2] = 0;
422 mn->objectlen = SHA256_DIGEST_SIZE * 2;
423 mn->flags = MF_OBJECT_EXIST;
426 static inline void map_to_object(struct map_node *mn, char *buf)
431 mn->flags |= MF_OBJECT_EXIST;
432 memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
433 mn->object[XSEG_MAX_TARGETLEN] = 0;
434 mn->objectlen = strlen(mn->object);
437 static inline void object_to_map(char* buf, struct map_node *mn)
439 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
440 memcpy(buf+1, mn->object, mn->objectlen);
441 memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
444 static inline void mapheader_to_map(struct map *m, char *buf)
447 memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
448 pos += SHA256_DIGEST_SIZE;
449 memcpy(buf + pos, &m->size, sizeof(m->size));
450 pos += sizeof(m->size);
454 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
455 struct map *map, struct map_node *mn)
458 struct mapperd *mapper = __get_mapperd(peer);
459 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
460 mapper->mbportno, X_ALLOC);
462 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
464 mn->object, map->volume, (unsigned long long) mn->objectidx);
467 int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
469 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
471 mn->object, map->volume, (unsigned long long) mn->objectidx);
474 char *target = xseg_get_target(peer->xseg, req);
475 strncpy(target, map->volume, req->targetlen);
476 req->size = objectsize_in_map;
477 req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
479 char *data = xseg_get_data(peer->xseg, req);
480 object_to_map(data, mn);
482 r = xseg_set_req_data(peer->xseg, req, pr);
484 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
486 mn->object, map->volume, (unsigned long long) mn->objectidx);
489 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
491 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
493 mn->object, map->volume, (unsigned long long) mn->objectidx);
496 r = xseg_signal(peer->xseg, p);
498 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
500 XSEGLOG2(&lc, I, "Writing object %s \n\t"
502 mn->object, map->volume, (unsigned long long) mn->objectidx);
507 xseg_get_req_data(peer->xseg, req, &dummy);
509 xseg_put_request(peer->xseg, req, pr->portno);
511 XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
513 mn->object, map->volume, (unsigned long long) mn->objectidx);
517 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
520 struct peerd *peer = pr->peer;
521 struct mapperd *mapper = __get_mapperd(peer);
523 uint64_t i, pos, max_objidx = calc_map_obj(map);
524 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
525 mapper->mbportno, X_ALLOC);
527 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
530 int r = xseg_prep_request(peer->xseg, req, map->volumelen,
531 mapheader_size + max_objidx * objectsize_in_map);
533 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
536 char *target = xseg_get_target(peer->xseg, req);
537 strncpy(target, map->volume, req->targetlen);
538 char *data = xseg_get_data(peer->xseg, req);
539 mapheader_to_map(map, data);
540 pos = mapheader_size;
542 req->size = req->datalen;
545 if (map->size % block_size)
547 for (i = 0; i < max_objidx; i++) {
548 mn = find_object(map, i);
550 XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
551 (unsigned long long) i, map->volume);
554 object_to_map(data+pos, mn);
555 pos += objectsize_in_map;
557 r = xseg_set_req_data(peer->xseg, req, pr);
559 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
563 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
565 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
569 r = xseg_signal(peer->xseg, p);
571 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
573 map->flags |= MF_MAP_WRITING;
574 XSEGLOG2(&lc, I, "Writing map %s", map->volume);
578 xseg_get_req_data(peer->xseg, req, &dummy);
580 xseg_put_request(peer->xseg, req, pr->portno);
582 XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
586 static int write_map(struct peer_req* pr, struct map *map)
589 struct peerd *peer = pr->peer;
590 map->flags |= MF_MAP_WRITING;
591 struct xseg_request *req = __write_map(pr, map);
594 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
595 if (req->state & XS_FAILED)
597 xseg_put_request(peer->xseg, req, pr->portno);
598 map->flags &= ~MF_MAP_WRITING;
602 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
606 struct xseg_request *req;
607 struct peerd *peer = pr->peer;
608 struct mapperd *mapper = __get_mapperd(peer);
611 XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
613 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
615 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
620 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
622 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
627 char *reqtarget = xseg_get_target(peer->xseg, req);
630 strncpy(reqtarget, m->volume, req->targetlen);
632 req->size = block_size;
634 r = xseg_set_req_data(peer->xseg, req, pr);
636 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
640 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
642 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
646 r = xseg_signal(peer->xseg, p);
648 XSEGLOG2(&lc, I, "Map %s loading", m->volume);
652 xseg_get_req_data(peer->xseg, req, &dummy);
654 xseg_put_request(peer->xseg, req, pr->portno);
659 static int read_map (struct map *map, char *buf)
661 char nulls[SHA256_DIGEST_SIZE];
662 memset(nulls, 0, SHA256_DIGEST_SIZE);
664 int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
666 XSEGLOG2(&lc, D, "Read zeros");
670 //type 1, our type, type 0 pithos map
671 int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
672 XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
675 struct map_node *map_node;
677 pos = SHA256_DIGEST_SIZE;
678 map->size = *(uint64_t *) (buf + pos);
679 pos += sizeof(uint64_t);
680 nr_objs = map->size / block_size;
681 if (map->size % block_size)
683 map_node = calloc(nr_objs, sizeof(struct map_node));
687 for (i = 0; i < nr_objs; i++) {
688 map_node[i].map = map;
689 map_node[i].objectidx = i;
690 map_node[i].waiters = 0;
692 map_node[i].cond = st_cond_new(); //FIXME err check;
693 map_to_object(&map_node[i], buf + pos);
694 pos += objectsize_in_map;
695 r = insert_object(map, &map_node[i]); //FIXME error check
699 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
700 map_node = calloc(max_nr_objs, sizeof(struct map_node));
703 for (i = 0; i < max_nr_objs; i++) {
704 if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
706 map_node[i].objectidx = i;
707 map_node[i].map = map;
708 map_node[i].waiters = 0;
710 map_node[i].cond = st_cond_new(); //FIXME err check;
711 pithosmap_to_object(&map_node[i], buf + pos);
712 pos += SHA256_DIGEST_SIZE;
713 r = insert_object(map, &map_node[i]); //FIXME error check
715 map->size = i * block_size;
718 XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
721 //FIXME cleanup on error
724 static int load_map(struct peer_req *pr, struct map *map)
727 struct xseg_request *req;
728 struct peerd *peer = pr->peer;
729 map->flags |= MF_MAP_LOADING;
730 req = __load_map(pr, map);
733 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
734 map->flags &= ~MF_MAP_LOADING;
735 if (req->state & XS_FAILED){
736 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
737 xseg_put_request(peer->xseg, req, pr->portno);
740 r = read_map(map, xseg_get_data(peer->xseg, req));
741 xseg_put_request(peer->xseg, req, pr->portno);
745 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m)
749 struct xseg_request *req;
750 struct peerd *peer = pr->peer;
751 struct mapperd *mapper = __get_mapperd(peer);
754 XSEGLOG2(&lc, I, "Opening map %s", m->volume);
756 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
758 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
763 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
765 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
770 char *reqtarget = xseg_get_target(peer->xseg, req);
773 strncpy(reqtarget, m->volume, req->targetlen);
775 req->size = block_size;
777 r = xseg_set_req_data(peer->xseg, req, pr);
779 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
783 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
785 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
789 r = xseg_signal(peer->xseg, p);
791 XSEGLOG2(&lc, I, "Map %s opening", m->volume);
795 xseg_get_req_data(peer->xseg, req, &dummy);
797 xseg_put_request(peer->xseg, req, pr->portno);
802 static int open_map(struct peer_req *pr, struct map *map)
805 struct xseg_request *req;
806 struct peerd *peer = pr->peer;
808 map->flags |= MF_MAP_OPENING;
809 req = __open_map(pr, map);
812 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
813 map->flags &= ~MF_MAP_OPENING;
814 err = req->state & XS_FAILED;
815 xseg_put_request(peer->xseg, req, pr->portno);
819 map->flags |= MF_MAP_EXCLUSIVE;
827 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
831 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
832 if (r == -XHASH_ERESIZE) {
833 xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
834 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
837 mio->copyups_nodes = new_hashmap;
838 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
842 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
843 if (r == -XHASH_ERESIZE) {
844 xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
845 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
848 mio->copyups_nodes = new_hashmap;
849 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
856 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
859 int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
865 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
867 struct mapperd *mapper = __get_mapperd(peer);
868 struct mapper_io *mio = __get_mapper_io(pr);
869 struct map *map = mn->map;
874 //struct sha256_ctx sha256ctx;
875 uint32_t newtargetlen;
876 char new_target[XSEG_MAX_TARGETLEN + 1];
877 unsigned char buf[SHA256_DIGEST_SIZE]; //assert sha256_digest_size(32) <= MAXTARGETLEN
878 char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
879 strncpy(new_object, map->volume, map->volumelen);
880 sprintf(new_object + map->volumelen, "%u", mn->objectidx); //sprintf adds null termination
881 new_object[XSEG_MAX_TARGETLEN + 19] = 0;
883 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
884 for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
885 sprintf (new_target + 2*i, "%02x", buf[i]);
886 newtargetlen = SHA256_DIGEST_SIZE * 2;
888 if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE))
889 goto copyup_zeroblock;
891 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
892 mapper->bportno, X_ALLOC);
894 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
897 r = xseg_prep_request(peer->xseg, req, newtargetlen,
898 sizeof(struct xseg_request_copy));
900 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
904 char *target = xseg_get_target(peer->xseg, req);
905 strncpy(target, new_target, req->targetlen);
907 struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
908 strncpy(xcopy->target, mn->object, mn->objectlen);
909 xcopy->targetlen = mn->objectlen;
912 req->size = block_size;
914 r = xseg_set_req_data(peer->xseg, req, pr);
916 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
919 r = __set_copyup_node(mio, req, mn);
920 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
922 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
925 xseg_signal(peer->xseg, p);
928 mn->flags |= MF_OBJECT_COPYING;
929 XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
933 r = __set_copyup_node(mio, req, NULL);
934 xseg_get_req_data(peer->xseg, req, &dummy);
936 xseg_put_request(peer->xseg, req, pr->portno);
938 XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
942 XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
943 "Proceeding in writing the new object in map");
944 /* construct a tmp map_node for writing purposes */
945 struct map_node newmn = *mn;
946 newmn.flags = MF_OBJECT_EXIST;
947 strncpy(newmn.object, new_target, newtargetlen);
948 newmn.object[newtargetlen] = 0;
949 newmn.objectlen = newtargetlen;
950 newmn.objectidx = mn->objectidx;
951 req = object_write(peer, pr, map, &newmn);
952 r = __set_copyup_node(mio, req, mn);
954 XSEGLOG2(&lc, E, "Object write returned error for object %s"
955 "\n\t of map %s [%llu]",
956 mn->object, map->volume, (unsigned long long) mn->objectidx);
959 mn->flags |= MF_OBJECT_WRITING;
960 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
964 static struct xseg_request * delete_object(struct peer_req *pr, struct map_node *mn)
967 struct peerd *peer = pr->peer;
968 struct mapperd *mapper = __get_mapperd(peer);
969 struct mapper_io *mio = __get_mapper_io(pr);
970 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
971 mapper->bportno, X_ALLOC);
972 XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
974 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
977 int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
979 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
982 char *target = xseg_get_target(peer->xseg, req);
983 strncpy(target, mn->object, req->targetlen);
985 req->size = req->datalen;
987 r = xseg_set_req_data(peer->xseg, req, pr);
989 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
992 __set_copyup_node(mio, req, mn);
993 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
995 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
998 r = xseg_signal(peer->xseg, p);
999 XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1003 xseg_get_req_data(peer->xseg, req, &dummy);
1005 xseg_put_request(peer->xseg, req, pr->portno);
1007 XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1011 static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
1014 struct peerd *peer = pr->peer;
1015 struct mapperd *mapper = __get_mapperd(peer);
1016 struct mapper_io *mio = __get_mapper_io(pr);
1017 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1018 mapper->mbportno, X_ALLOC);
1019 XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1021 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1024 int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1026 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1029 char *target = xseg_get_target(peer->xseg, req);
1030 strncpy(target, map->volume, req->targetlen);
1032 req->size = req->datalen;
1034 r = xseg_set_req_data(peer->xseg, req, pr);
1036 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1039 __set_copyup_node(mio, req, NULL);
1040 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1042 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1045 r = xseg_signal(peer->xseg, p);
1046 map->flags |= MF_MAP_DELETING;
1047 XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1051 xseg_get_req_data(peer->xseg, req, &dummy);
1053 xseg_put_request(peer->xseg, req, pr->portno);
1055 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1060 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1062 struct map_node *mn = find_object(map, index);
1068 static inline void put_mapnode(struct map_node *mn)
1073 st_cond_destroy(mn->cond);
1077 static inline void __get_map(struct map *map)
1082 static inline void put_map(struct map *map)
1084 struct map_node *mn;
1087 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1090 for (i = 0; i < calc_map_obj(map); i++) {
1091 mn = get_mapnode(map, i);
1093 //make sure all pending operations on all objects are completed
1094 if (mn->flags & MF_OBJECT_NOT_READY){
1095 //this should never happen...
1096 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1098 mn->flags &= MF_OBJECT_DESTROYED;
1099 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1100 put_mapnode(mn); //matcing get_mapnode;
1101 //assert mn->ref == 0;
1104 mn = find_object(map, 0);
1107 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1112 static struct map * create_map(struct mapperd *mapper, char *name, uint32_t namelen)
1115 struct map *m = malloc(sizeof(struct map));
1117 XSEGLOG2(&lc, E, "Cannot allocate map ");
1121 strncpy(m->volume, name, namelen);
1122 m->volume[namelen] = 0;
1123 m->volumelen = namelen;
1125 m->objects = xhash_new(3, INTEGER);
1127 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1133 m->cond = st_cond_new(); //FIXME err check;
1134 r = insert_map(mapper, m);
1136 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1143 xhash_free(m->objects);
1145 XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1153 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1155 struct peerd *peer = pr->peer;
1156 struct mapperd *mapper = __get_mapperd(peer);
1158 struct mapper_io *mio = __get_mapper_io(pr);
1159 struct map_node *mn = __get_copyup_node(mio, req);
1162 if (req->state & XS_FAILED){
1166 xseg_put_request(peer->xseg, req, pr->portno);
1170 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1172 struct peerd *peer = pr->peer;
1173 struct mapperd *mapper = __get_mapperd(peer);
1175 struct mapper_io *mio = __get_mapper_io(pr);
1176 struct map_node *mn = __get_copyup_node(mio, req);
1178 XSEGLOG2(&lc, E, "Cannot get map node");
1181 __set_copyup_node(mio, req, NULL);
1183 if (req->state & XS_FAILED){
1184 XSEGLOG2(&lc, E, "Req failed");
1185 mn->flags &= ~MF_OBJECT_COPYING;
1186 mn->flags &= ~MF_OBJECT_WRITING;
1189 if (req->op == X_WRITE) {
1190 char *target = xseg_get_target(peer->xseg, req);
1192 //printf("handle object write replyi\n");
1193 __set_copyup_node(mio, req, NULL);
1194 //assert mn->flags & MF_OBJECT_WRITING
1195 mn->flags &= ~MF_OBJECT_WRITING;
1197 struct map_node tmp;
1198 char *data = xseg_get_data(peer->xseg, req);
1199 map_to_object(&tmp, data);
1200 mn->flags |= MF_OBJECT_EXIST;
1201 if (mn->flags != MF_OBJECT_EXIST){
1202 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1205 //assert mn->flags & MF_OBJECT_EXIST
1206 strncpy(mn->object, tmp.object, tmp.objectlen);
1207 mn->object[tmp.objectlen] = 0;
1208 mn->objectlen = tmp.objectlen;
1209 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1212 } else if (req->op == X_COPY) {
1213 // issue write_object;
1214 mn->flags &= ~MF_OBJECT_COPYING;
1215 struct map *map = mn->map;
1217 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1221 /* construct a tmp map_node for writing purposes */
1222 char *target = xseg_get_target(peer->xseg, req);
1223 struct map_node newmn = *mn;
1224 newmn.flags = MF_OBJECT_EXIST;
1225 strncpy(newmn.object, target, req->targetlen);
1226 newmn.object[req->targetlen] = 0;
1227 newmn.objectlen = req->targetlen;
1228 newmn.objectidx = mn->objectidx;
1229 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1231 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1232 "\n\t of map %s [%llu]",
1233 mn->object, map->volume, (unsigned long long) mn->objectidx);
1236 mn->flags |= MF_OBJECT_WRITING;
1237 __set_copyup_node (mio, xreq, mn);
1239 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1246 xseg_put_request(peer->xseg, req, pr->portno);
1251 XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1260 struct map_node *mn;
1265 static int req2objs(struct peer_req *pr, struct map *map, int write)
1268 struct peerd *peer = pr->peer;
1269 struct mapper_io *mio = __get_mapper_io(pr);
1270 char *target = xseg_get_target(peer->xseg, pr->req);
1271 uint32_t nr_objs = calc_nr_obj(pr->req);
1272 uint64_t size = sizeof(struct xseg_reply_map) +
1273 nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1274 uint32_t idx, i, ready;
1275 uint64_t rem_size, obj_index, obj_offset, obj_size;
1276 struct map_node *mn;
1278 XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1280 /* get map_nodes of request */
1281 struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1283 XSEGLOG2(&lc, E, "Cannot allocate mns");
1287 rem_size = pr->req->size;
1288 obj_index = pr->req->offset / block_size;
1289 obj_offset = pr->req->offset & (block_size -1); //modulo
1290 obj_size = (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1291 mn = get_mapnode(map, obj_index);
1293 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1298 mns[idx].offset = obj_offset;
1299 mns[idx].size = obj_size;
1300 rem_size -= obj_size;
1301 while (rem_size > 0) {
1305 obj_size = (rem_size > block_size) ? block_size : rem_size;
1306 rem_size -= obj_size;
1307 mn = get_mapnode(map, obj_index);
1309 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1314 mns[idx].offset = obj_offset;
1315 mns[idx].size = obj_size;
1321 while (ready < (idx + 1)){
1323 for (i = 0; i < (idx+1); i++) {
1326 if (mn->flags & MF_OBJECT_NOT_READY) {
1328 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1329 if (mn->flags & MF_OBJECT_DELETED){
1333 XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1341 else if (!(mn->flags & MF_OBJECT_EXIST)) {
1342 //calc new_target, copy up object
1343 if (copyup_object(peer, mn, pr) == NULL){
1344 XSEGLOG2(&lc, E, "Error in copy up object");
1357 while(mio->copyups > 0){
1358 mio->cb = copyup_cb;
1361 st_cond_wait(pr->cond);
1368 XSEGLOG2(&lc, E, "Mio->err");
1372 /* resize request to fit reply */
1373 char buf[XSEG_MAX_TARGETLEN];
1374 strncpy(buf, target, pr->req->targetlen);
1375 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1377 XSEGLOG2(&lc, E, "Cannot resize request");
1380 target = xseg_get_target(peer->xseg, pr->req);
1381 strncpy(target, buf, pr->req->targetlen);
1383 /* structure reply */
1384 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1385 reply->cnt = nr_objs;
1386 for (i = 0; i < (idx+1); i++) {
1387 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1388 reply->segs[i].targetlen = mns[i].mn->objectlen;
1389 reply->segs[i].offset = mns[i].offset;
1390 reply->segs[i].size = mns[i].size;
1393 for (i = 0; i < idx; i++) {
1394 put_mapnode(mns[i].mn);
1400 static int do_dropcache(struct peer_req *pr, struct map *map)
1402 struct map_node *mn;
1403 struct peerd *peer = pr->peer;
1404 struct mapperd *mapper = __get_mapperd(peer);
1406 XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1407 map->flags |= MF_MAP_DROPPING_CACHE;
1408 for (i = 0; i < calc_map_obj(map); i++) {
1409 mn = get_mapnode(map, i);
1411 if (!(mn->flags & MF_OBJECT_DESTROYED)){
1412 //make sure all pending operations on all objects are completed
1413 if (mn->flags & MF_OBJECT_NOT_READY){
1414 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1416 mn->flags &= MF_OBJECT_DESTROYED;
1421 map->flags &= ~MF_MAP_DROPPING_CACHE;
1422 map->flags |= MF_MAP_DESTROYED;
1423 remove_map(mapper, map);
1424 XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1425 put_map(map); // put map here to destroy it (matches m->ref = 1 on map create)
1429 static int do_info(struct peer_req *pr, struct map *map)
1431 struct peerd *peer = pr->peer;
1432 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1433 xinfo->size = map->size;
1438 static int do_close(struct peer_req *pr, struct map *map)
1440 // struct peerd *peer = pr->peer;
1441 // struct xseg_request *req;
1442 if (map->flags & MF_MAP_EXCLUSIVE)
1444 return do_dropcache(pr, map);
1447 static int do_destroy(struct peer_req *pr, struct map *map)
1450 struct peerd *peer = pr->peer;
1451 struct mapper_io *mio = __get_mapper_io(pr);
1452 struct map_node *mn;
1453 struct xseg_request *req;
1455 XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1456 map->flags |= MF_MAP_DELETING;
1457 req = delete_map(pr, map);
1460 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1461 if (req->state & XS_FAILED){
1462 xseg_put_request(peer->xseg, req, pr->portno);
1463 map->flags &= ~MF_MAP_DELETING;
1466 xseg_put_request(peer->xseg, req, pr->portno);
1468 uint64_t nr_obj = calc_map_obj(map);
1469 uint64_t deleted = 0;
1470 while (deleted < nr_obj){
1472 for (i = 0; i < nr_obj; i++){
1473 mn = get_mapnode(map, i);
1475 if (!(mn->flags & MF_OBJECT_DESTROYED)){
1476 if (mn->flags & MF_OBJECT_EXIST){
1477 //make sure all pending operations on all objects are completed
1478 if (mn->flags & MF_OBJECT_NOT_READY){
1479 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1481 req = delete_object(pr, mn);
1483 if (mio->del_pending){
1492 mn->flags &= MF_OBJECT_DESTROYED;
1499 mio->cb = deletion_cb;
1500 wait_on_pr(pr, mio->del_pending > 0);
1503 map->flags &= ~MF_MAP_DELETING;
1504 XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1505 return do_close(pr, map);
1508 static int do_mapr(struct peer_req *pr, struct map *map)
1510 struct peerd *peer = pr->peer;
1511 int r = req2objs(pr, map, 0);
1513 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1515 (unsigned long long) pr->req->offset,
1516 (unsigned long long) (pr->req->offset + pr->req->size));
1519 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1521 (unsigned long long) pr->req->offset,
1522 (unsigned long long) (pr->req->offset + pr->req->size));
1523 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1524 (unsigned long long) pr->req->offset,
1525 (unsigned long long) pr->req->size);
1526 char buf[XSEG_MAX_TARGETLEN+1];
1527 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1529 for (i = 0; i < reply->cnt; i++) {
1530 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1531 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1532 buf[reply->segs[i].targetlen] = 0;
1533 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1534 (unsigned long long) reply->segs[i].offset,
1535 (unsigned long long) reply->segs[i].size);
1540 static int do_mapw(struct peer_req *pr, struct map *map)
1542 struct peerd *peer = pr->peer;
1543 int r = req2objs(pr, map, 1);
1545 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1547 (unsigned long long) pr->req->offset,
1548 (unsigned long long) (pr->req->offset + pr->req->size));
1551 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1553 (unsigned long long) pr->req->offset,
1554 (unsigned long long) (pr->req->offset + pr->req->size));
1555 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1556 (unsigned long long) pr->req->offset,
1557 (unsigned long long) pr->req->size);
1558 char buf[XSEG_MAX_TARGETLEN+1];
1559 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1561 for (i = 0; i < reply->cnt; i++) {
1562 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1563 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1564 buf[reply->segs[i].targetlen] = 0;
1565 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1566 (unsigned long long) reply->segs[i].offset,
1567 (unsigned long long) reply->segs[i].size);
1572 //here map is the parent map
1573 static int do_clone(struct peer_req *pr, struct map *map)
1576 FIXME check if clone map exists
1577 clonemap = get_map(pr, target, targetlen, MF_LOAD);
1579 do_dropcache(pr, clonemap); // drop map here, rely on get_map_function to drop
1580 // cache on non-exclusive opens or declare a NO_CACHE flag ?
1585 char buf[XSEG_MAX_TARGETLEN];
1586 struct peerd *peer = pr->peer;
1587 struct mapperd *mapper = __get_mapperd(peer);
1588 char *target = xseg_get_target(peer->xseg, pr->req);
1589 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1590 XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1591 struct map *clonemap = create_map(mapper, target, pr->req->targetlen);
1595 if (xclone->size == -1)
1596 clonemap->size = map->size;
1598 clonemap->size = xclone->size;
1599 if (clonemap->size < map->size){
1600 target = xseg_get_target(peer->xseg, pr->req);
1601 strncpy(buf, target, pr->req->targetlen);
1602 buf[pr->req->targetlen] = 0;
1603 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1604 "\n\t for requested clone %s",
1605 (unsigned long long) xclone->size,
1606 (unsigned long long) map->size, buf);
1610 //alloc and init map_nodes
1611 unsigned long c = clonemap->size/block_size + 1;
1612 struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1617 for (i = 0; i < clonemap->size/block_size + 1; i++) {
1618 struct map_node *mn = get_mapnode(map, i);
1620 strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1621 map_nodes[i].objectlen = mn->objectlen;
1624 strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
1625 map_nodes[i].objectlen = strlen(zero_block);
1627 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1628 map_nodes[i].flags = 0;
1629 map_nodes[i].objectidx = i;
1630 map_nodes[i].map = clonemap;
1631 map_nodes[i].ref = 1;
1632 map_nodes[i].waiters = 0;
1633 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1634 r = insert_object(clonemap, &map_nodes[i]);
1636 XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1640 r = write_map(pr, clonemap);
1642 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1652 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1656 if (flags & MF_EXCLUSIVE){
1657 r = open_map(pr, map);
1659 if (flags & MF_FORCE){
1667 r = load_map(pr, map);
1668 if (r < 0 && opened){
1674 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1677 struct peerd *peer = pr->peer;
1678 struct mapperd *mapper = __get_mapperd(peer);
1679 struct map *map = find_map(mapper, name, namelen);
1681 if (flags & MF_LOAD){
1682 map = create_map(mapper, name, namelen);
1685 r = open_load_map(pr, map, flags);
1687 do_dropcache(pr, map);
1693 } else if (map->flags & MF_MAP_DESTROYED){
1701 static int map_action(int (action)(struct peer_req *pr, struct map *map),
1702 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1704 //struct peerd *peer = pr->peer;
1707 map = get_map(pr, name, namelen, flags);
1710 if (map->flags & MF_MAP_NOT_READY){
1711 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1715 int r = action(pr, map);
1716 //always drop cache if map not read exclusively
1717 if (!(map->flags & MF_MAP_EXCLUSIVE))
1718 do_dropcache(pr, map);
1719 //maybe capture ref before and compare here?
1727 void * handle_info(struct peer_req *pr)
1729 struct peerd *peer = pr->peer;
1730 char *target = xseg_get_target(peer->xseg, pr->req);
1731 int r = map_action(do_info, pr, target, pr->req->targetlen, MF_LOAD);
1740 void * handle_clone(struct peer_req *pr)
1743 struct peerd *peer = pr->peer;
1744 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1749 if (xclone->targetlen){
1750 r = map_action(do_clone, pr, xclone->target, xclone->targetlen, MF_LOAD);
1756 char *target = xseg_get_target(peer->xseg, pr->req);
1757 XSEGLOG2(&lc, I, "Creating volume");
1758 map = get_map(pr, target, pr->req->targetlen, MF_LOAD);
1760 XSEGLOG2(&lc, E, "Volume %s exists", map->volume);
1761 if (map->ref <= 2) //initial one + one ref from __get_map
1762 do_dropcache(pr, map); //we are the only ones usining this map. Drop the cache.
1763 put_map(map); //matches get_map
1767 //create a new empty map of size
1768 map = create_map(mapper, target, pr->req->targetlen);
1773 map->size = xclone->size;
1774 //populate_map with zero objects;
1775 uint64_t nr_objs = xclone->size / block_size;
1776 if (xclone->size % block_size)
1779 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1781 do_dropcache(pr, map); //Since we just created the map, dropping cache should be sufficient.
1786 for (i = 0; i < nr_objs; i++) {
1787 strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
1788 map_nodes[i].objectlen = strlen(zero_block);
1789 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1790 map_nodes[i].flags = 0;
1791 map_nodes[i].objectidx = i;
1792 map_nodes[i].map = map;
1793 map_nodes[i].ref = 1;
1794 map_nodes[i].waiters = 0;
1795 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1796 r = insert_object(map, &map_nodes[i]);
1798 do_dropcache(pr, map);
1803 r = write_map(pr, map);
1805 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1806 do_dropcache(pr, map);
1809 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1811 do_dropcache(pr, map); //drop cache here for consistency
1823 void * handle_mapr(struct peer_req *pr)
1825 struct peerd *peer = pr->peer;
1826 char *target = xseg_get_target(peer->xseg, pr->req);
1827 int r = map_action(do_mapr, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE);
1836 void * handle_mapw(struct peer_req *pr)
1838 struct peerd *peer = pr->peer;
1839 char *target = xseg_get_target(peer->xseg, pr->req);
1840 int r = map_action(do_mapw, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1845 XSEGLOG2(&lc, D, "Ta: %d", ta);
1850 void * handle_destroy(struct peer_req *pr)
1852 struct peerd *peer = pr->peer;
1853 char *target = xseg_get_target(peer->xseg, pr->req);
1854 int r = map_action(do_destroy, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1863 void * handle_close(struct peer_req *pr)
1865 struct peerd *peer = pr->peer;
1866 char *target = xseg_get_target(peer->xseg, pr->req);
1867 //here we do not want to load
1868 int r = map_action(do_close, pr, target, pr->req->targetlen, MF_EXCLUSIVE|MF_FORCE);
1877 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
1878 struct xseg_request *req)
1880 //struct mapperd *mapper = __get_mapperd(peer);
1881 struct mapper_io *mio = __get_mapper_io(pr);
1882 void *(*action)(struct peer_req *) = NULL;
1884 mio->state = ACCEPTED;
1887 switch (pr->req->op) {
1888 /* primary xseg operations of mapper */
1889 case X_CLONE: action = handle_clone; break;
1890 case X_MAPR: action = handle_mapr; break;
1891 case X_MAPW: action = handle_mapw; break;
1892 // case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1893 case X_INFO: action = handle_info; break;
1894 case X_DELETE: action = handle_destroy; break;
1895 case X_CLOSE: action = handle_close; break;
1896 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1901 st_thread_create(action, pr, 0, 0);
1907 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1908 enum dispatch_reason reason)
1910 struct mapperd *mapper = __get_mapperd(peer);
1912 struct mapper_io *mio = __get_mapper_io(pr);
1916 if (reason == dispatch_accept)
1917 dispatch_accepted(peer, pr, req);
1928 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1931 unsigned char buf[SHA256_DIGEST_SIZE];
1932 unsigned char *zero;
1934 gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1936 /* Version check should be the very first call because it
1937 makes sure that important subsystems are intialized. */
1938 gcry_check_version (NULL);
1940 /* Disable secure memory. */
1941 gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1943 /* Tell Libgcrypt that initialization has completed. */
1944 gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1946 /* calculate out magic sha hash value */
1947 gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1949 /* calculate zero block */
1950 //FIXME check hash value
1951 zero = malloc(block_size);
1952 memset(zero, 0, block_size);
1953 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1954 for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1955 sprintf(zero_block + 2*i, "%02x", buf[i]);
1956 printf("%s \n", zero_block);
1959 //FIXME error checks
1960 struct mapperd *mapperd = malloc(sizeof(struct mapperd));
1961 peer->priv = mapperd;
1963 mapper->hashmaps = xhash_new(3, STRING);
1965 for (i = 0; i < peer->nr_ops; i++) {
1966 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1967 mio->copyups_nodes = xhash_new(3, INTEGER);
1971 peer->peer_reqs[i].priv = mio;
1974 for (i = 0; i < argc; i++) {
1975 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1976 mapper->bportno = atoi(argv[i+1]);
1980 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1981 mapper->mbportno = atoi(argv[i+1]);
1985 /* enforce only one thread */
1986 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1987 int t = atoi(argv[i+1]);
1989 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1997 const struct sched_param param = { .sched_priority = 99 };
1998 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
2006 void print_obj(struct map_node *mn)
2008 fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n",
2009 (unsigned long long) mn->objectidx, mn->object,
2010 (unsigned int) mn->objectlen,
2011 (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2014 void print_map(struct map *m)
2016 uint64_t nr_objs = m->size/block_size;
2017 if (m->size % block_size)
2019 fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n",
2020 m->volume, m->volumelen,
2021 (unsigned long long) m->size,
2022 (unsigned long long) nr_objs);
2024 struct map_node *mn;
2025 if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2027 for (i = 0; i < nr_objs; i++) {
2028 mn = find_object(m, i);
2030 printf("object idx [%llu] not found!\n", (unsigned long long) i);
2038 void test_map(struct peerd *peer)
2041 //struct sha256_ctx sha256ctx;
2042 unsigned char buf[SHA256_DIGEST_SIZE];
2043 char buf_new[XSEG_MAX_TARGETLEN + 20];
2044 struct map *m = malloc(sizeof(struct map));
2045 strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2046 m->volume[XSEG_MAX_TARGETLEN] = 0;
2047 strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2048 buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2049 m->volumelen = XSEG_MAX_TARGETLEN;
2050 m->size = 100*block_size;
2051 m->objects = xhash_new(3, INTEGER);
2052 struct map_node *map_node = calloc(100, sizeof(struct map_node));
2053 for (i = 0; i < 100; i++) {
2054 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2055 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2057 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2058 sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2060 map_node[i].objectidx = i;
2061 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2062 map_node[i].flags = MF_OBJECT_EXIST;
2063 ret = insert_object(m, &map_node[i]);
2066 char *data = malloc(block_size);
2067 mapheader_to_map(m, data);
2068 uint64_t pos = mapheader_size;
2070 for (i = 0; i < 100; i++) {
2071 map_node = find_object(m, i);
2073 printf("no object node %d \n", i);
2076 object_to_map(data+pos, map_node);
2077 pos += objectsize_in_map;
2081 struct map *m2 = malloc(sizeof(struct map));
2082 strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2083 m->volume[XSEG_MAX_TARGETLEN] = 0;
2084 m->volumelen = XSEG_MAX_TARGETLEN;
2086 m2->objects = xhash_new(3, INTEGER);
2087 ret = read_map(peer, m2, data);
2090 int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2092 while (sum < block_size) {
2093 r = write(fd, data + sum, block_size -sum);
2096 printf("write error\n");
2102 map_node = find_object(m, 0);