2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
37 #include <sys/types.h>
39 #include <xseg/xseg.h>
42 #include <xtypes/xlock.h>
43 #include <xtypes/xhash.h>
44 #include <xseg/protocol.h>
49 #include <sys/syscall.h>
50 #include <openssl/sha.h>
53 /* general mapper flags */
54 #define MF_LOAD (1 << 0)
55 #define MF_EXCLUSIVE (1 << 1)
56 #define MF_FORCE (1 << 2)
57 #define MF_ARCHIP (1 << 3)
59 #ifndef SHA256_DIGEST_SIZE
60 #define SHA256_DIGEST_SIZE 32
62 /* hex representation of sha256 value takes up double the sha256 size */
63 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
65 #define block_size (1<<22) //FIXME this should be defined here?
67 /* transparency byte + max object len in disk */
68 #define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
70 /* Map header contains:
74 #define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
77 #define MAPPER_PREFIX "archip_"
78 #define MAPPER_PREFIX_LEN 7
80 #define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
81 #define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
83 #if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
84 #error "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
87 #define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
89 #if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
90 #error "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
93 #define MAX_VOLUME_SIZE \
94 ((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
97 //char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
99 /* pithos considers this a block full of zeros, so should we.
100 * it is actually the sha256 hash of nothing.
102 char *zero_block="e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
103 #define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
105 /* dispatch_internal mapper states */
114 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
117 /* mapper object flags */
118 #define MF_OBJECT_EXIST (1 << 0)
119 #define MF_OBJECT_COPYING (1 << 1)
120 #define MF_OBJECT_WRITING (1 << 2)
121 #define MF_OBJECT_DELETING (1 << 3)
122 #define MF_OBJECT_DESTROYED (1 << 5)
123 #define MF_OBJECT_SNAPSHOTTING (1 << 6)
125 #define MF_OBJECT_NOT_READY (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
126 MF_OBJECT_DELETING|MF_OBJECT_SNAPSHOTTING)
131 char object[MAX_OBJECT_LEN + 1]; /* NULL terminated string */
139 #define wait_on_pr(__pr, __condition__) \
142 __get_mapper_io(pr)->active = 0;\
143 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u", pr, ta); \
144 st_cond_wait(__pr->cond); \
145 } while (__condition__)
147 #define wait_on_mapnode(__mn, __condition__) \
151 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
152 ta: %u", __mn, __mn->object, __mn->waiters, ta); \
153 st_cond_wait(__mn->cond); \
154 } while (__condition__)
156 #define wait_on_map(__map, __condition__) \
160 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
161 __map, __map->volume, __map->waiters, ta); \
162 st_cond_wait(__map->cond); \
163 } while (__condition__)
165 #define signal_pr(__pr) \
167 if (!__get_mapper_io(pr)->active){\
169 XSEGLOG2(&lc, D, "Signaling pr %lx, ta: %u", pr, ta);\
170 __get_mapper_io(pr)->active = 1;\
171 st_cond_signal(__pr->cond); \
175 #define signal_map(__map) \
177 XSEGLOG2(&lc, D, "Checking map %lx %s. Waiters %u, ta: %u", \
178 __map, __map->volume, __map->waiters, ta); \
179 if (__map->waiters) { \
180 ta += __map->waiters; \
181 XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
182 ta: %u", __map, __map->volume, __map->waiters, ta); \
183 __map->waiters = 0; \
184 st_cond_broadcast(__map->cond); \
188 #define signal_mapnode(__mn) \
190 if (__mn->waiters) { \
191 ta += __mn->waiters; \
192 XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
193 %u, ta: %u", __mn, __mn->object, __mn->waiters, ta); \
195 st_cond_broadcast(__mn->cond); \
201 #define MF_MAP_LOADING (1 << 0)
202 #define MF_MAP_DESTROYED (1 << 1)
203 #define MF_MAP_WRITING (1 << 2)
204 #define MF_MAP_DELETING (1 << 3)
205 #define MF_MAP_DROPPING_CACHE (1 << 4)
206 #define MF_MAP_EXCLUSIVE (1 << 5)
207 #define MF_MAP_OPENING (1 << 6)
208 #define MF_MAP_CLOSING (1 << 7)
209 #define MF_MAP_DELETED (1 << 8)
210 #define MF_MAP_SNAPSHOTTING (1 << 9)
212 #define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
213 MF_MAP_DROPPING_CACHE|MF_MAP_OPENING| \
221 char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
222 xhash_t *objects; /* obj_index --> map_node */
229 xport bportno; /* blocker that accesses data */
230 xport mbportno; /* blocker that accesses maps */
231 xhash_t *hashmaps; // hash_function(target) --> struct map
235 volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */
236 xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
237 struct map_node *copyup_node;
238 volatile int err; /* error flag */
239 volatile uint64_t del_pending;
240 volatile uint64_t snap_pending;
244 enum mapper_state state;
249 struct mapperd *mapper;
251 void print_map(struct map *m);
254 void custom_peer_usage()
256 fprintf(stderr, "Custom peer options: \n"
257 "-bp : port for block blocker(!)\n"
258 "-mbp : port for map blocker\n"
267 static inline struct mapperd * __get_mapperd(struct peerd *peer)
269 return (struct mapperd *) peer->priv;
272 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
274 return (struct mapper_io *) pr->priv;
277 static inline uint64_t calc_map_obj(struct map *map)
281 uint64_t nr_objs = map->size / block_size;
282 if (map->size % block_size)
287 static uint32_t calc_nr_obj(struct xseg_request *req)
290 uint64_t rem_size = req->size;
291 uint64_t obj_offset = req->offset & (block_size -1); //modulo
292 uint64_t obj_size = (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
293 rem_size -= obj_size;
294 while (rem_size > 0) {
295 obj_size = (rem_size > block_size) ? block_size : rem_size;
296 rem_size -= obj_size;
304 * Unsafe. Doesn't check if data length is odd!
307 static void hexlify(unsigned char *data, char *hex)
310 for (i=0; i<SHA256_DIGEST_LENGTH; i++)
311 sprintf(hex+2*i, "%02x", data[i]);
314 static void unhexlify(char *hex, unsigned char *data)
318 for (i=0; i<SHA256_DIGEST_LENGTH; i++){
333 data[i] |= (c << 4) & 0xF0;
351 void merkle_hash(unsigned char *hashes, unsigned long len,
352 unsigned char hash[SHA256_DIGEST_SIZE])
354 uint32_t i, l, s = 2;
355 uint32_t nr = len/SHA256_DIGEST_SIZE;
357 unsigned char tmp_hash[SHA256_DIGEST_SIZE];
360 SHA256(hashes, 0, hash);
364 memcpy(hash, hashes, SHA256_DIGEST_SIZE);
369 buf = malloc(sizeof(unsigned char)* SHA256_DIGEST_SIZE * s);
370 memcpy(buf, hashes, nr * SHA256_DIGEST_SIZE);
371 memset(buf + nr * SHA256_DIGEST_SIZE, 0, (s - nr) * SHA256_DIGEST_SIZE);
372 for (l = s; l > 1; l = l/2) {
373 for (i = 0; i < l; i += 2) {
374 SHA256(buf + (i * SHA256_DIGEST_SIZE),
375 2 * SHA256_DIGEST_SIZE, tmp_hash);
376 memcpy(buf + (i/2 * SHA256_DIGEST_SIZE),
377 tmp_hash, SHA256_DIGEST_SIZE);
380 memcpy(hash, buf, SHA256_DIGEST_SIZE);
384 * Maps handling functions
387 static struct map * find_map(struct mapperd *mapper, char *volume)
389 struct map *m = NULL;
390 int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
397 static struct map * find_map_len(struct mapperd *mapper, char *target,
398 uint32_t targetlen, uint32_t flags)
400 char buf[XSEG_MAX_TARGETLEN+1];
401 if (flags & MF_ARCHIP){
402 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
403 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
404 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
405 targetlen += MAPPER_PREFIX_LEN;
408 strncpy(buf, target, targetlen);
412 if (targetlen > MAX_VOLUME_LEN){
413 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
414 targetlen, MAX_VOLUME_LEN);
418 XSEGLOG2(&lc, D, "looking up map %s, len %u",
420 return find_map(mapper, buf);
424 static int insert_map(struct mapperd *mapper, struct map *map)
428 if (find_map(mapper, map->volume)){
429 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
433 XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)",
434 map->volume, strlen(map->volume), (unsigned long) map);
435 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
436 while (r == -XHASH_ERESIZE) {
437 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
438 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
440 XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
441 (unsigned long long) shift);
444 mapper->hashmaps = new_hashmap;
445 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
451 static int remove_map(struct mapperd *mapper, struct map *map)
455 //assert no pending pr on map
457 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
458 while (r == -XHASH_ERESIZE) {
459 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
460 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
462 XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
463 (unsigned long long) shift);
466 mapper->hashmaps = new_hashmap;
467 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
473 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
477 struct peerd *peer = pr->peer;
478 struct xseg_request *req;
479 struct mapperd *mapper = __get_mapperd(peer);
482 XSEGLOG2(&lc, I, "Closing map %s", map->volume);
484 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
486 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
491 r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
493 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
498 char *reqtarget = xseg_get_target(peer->xseg, req);
501 strncpy(reqtarget, map->volume, req->targetlen);
505 r = xseg_set_req_data(peer->xseg, req, pr);
507 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
511 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
513 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
517 r = xseg_signal(peer->xseg, p);
518 map->flags |= MF_MAP_CLOSING;
520 XSEGLOG2(&lc, I, "Map %s closing", map->volume);
524 xseg_get_req_data(peer->xseg, req, &dummy);
526 xseg_put_request(peer->xseg, req, pr->portno);
531 static int close_map(struct peer_req *pr, struct map *map)
534 struct xseg_request *req;
535 struct peerd *peer = pr->peer;
537 req = __close_map(pr, map);
540 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
541 map->flags &= ~MF_MAP_CLOSING;
542 err = req->state & XS_FAILED;
543 xseg_put_request(peer->xseg, req, pr->portno);
550 static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
551 char *target, uint32_t targetlen, struct map **m)
553 struct mapperd *mapper = __get_mapperd(peer);
555 *m = find_map(mapper, target, targetlen);
557 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
558 if ((*m)->flags & MF_MAP_NOT_READY) {
559 __xq_append_tail(&(*m)->pending, (xqindex) pr);
560 XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
562 //} else if ((*m)->flags & MF_MAP_DESTROYED){
566 XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
570 r = open_map(peer, pr, target, targetlen, 0);
577 * Object handling functions
580 struct map_node *find_object(struct map *map, uint64_t obj_index)
583 int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
589 static int insert_object(struct map *map, struct map_node *mn)
591 //FIXME no find object first
592 int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
593 if (r == -XHASH_ERESIZE) {
594 unsigned long shift = xhash_grow_size_shift(map->objects);
595 map->objects = xhash_resize(map->objects, shift, NULL);
598 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
605 * map read/write functions
607 * version 0 -> pithos map
608 * version 1 -> archipelago version 1
612 * int read_object(struct map_node *mn, unsigned char *buf)
613 * int prepare_write_object(struct peer_req *pr, struct map *map,
614 * struct map_node *mn, struct xseg_request *req)
615 * int read_map(struct map *m, unsigned char * data)
616 * int prepare_write_map(struct peer_req *pr, struct map *map,
617 * struct xseg_request *req)
620 struct map_functions {
621 int (*read_object)(struct map_node *mn, unsigned char *buf);
622 int (*prepare_write_object)(struct peer_req *pr, struct map *map,
623 struct map_node *mn, struct xseg_request *req);
624 int (*read_map)(struct map *m, unsigned char * data);
625 int (*prepare_write_map)(struct peer_req *pr, struct map *map,
626 struct xseg_request *req);
629 /* version 0 functions */
632 #define v0_mapheader_size 0
633 /* just the unhexlified name */
634 #define v0_objectsize_in_map SHA256_DIGEST_SIZE
636 static inline int read_object_v0(struct map_node *mn, unsigned char *buf)
638 hexlify(buf, mn->object);
639 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
640 mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
641 mn->flags = MF_OBJECT_EXIST;
646 static void v0_object_to_map(struct map_node *mn, unsigned char *data)
648 unhexlify(mn->object, data);
651 static int prepare_write_object_v0(struct peer_req *pr, struct map *map,
652 struct map_node *mn, struct xseg_request *req)
654 struct peerd *peer = pr->peer;
655 int r = xseg_prep_request(peer->xseg, req, map->volumelen, v0_objectsize_in_map);
657 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
659 mn->object, map->volume, (unsigned long long) mn->objectidx);
662 char *target = xseg_get_target(peer->xseg, req);
663 strncpy(target, map->volume, req->targetlen);
664 req->size = req->datalen;
665 req->offset = v0_mapheader_size + mn->objectidx * v0_objectsize_in_map;
667 unsigned char *data = xseg_get_data(pr->peer->xseg, req);
668 v0_object_to_map(mn, data);
672 static int read_map_v0(struct map *m, unsigned char * data)
675 struct map_node *map_node;
678 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
679 XSEGLOG2(&lc, D, "Max nr_objs %llu", max_nr_objs);
680 char nulls[SHA256_DIGEST_SIZE];
681 memset(nulls, 0, SHA256_DIGEST_SIZE);
682 map_node = calloc(max_nr_objs, sizeof(struct map_node));
685 for (i = 0; i < max_nr_objs; i++) {
686 if (!memcmp(data+pos, nulls, v0_objectsize_in_map))
688 map_node[i].objectidx = i;
690 map_node[i].waiters = 0;
692 map_node[i].cond = st_cond_new(); //FIXME err check;
693 read_object_v0(&map_node[i], data+pos);
694 pos += v0_objectsize_in_map;
695 r = insert_object(m, &map_node[i]); //FIXME error check
697 XSEGLOG2(&lc, D, "Found %llu objects", i);
698 m->size = i * block_size;
702 static int prepare_write_map_v0(struct peer_req *pr, struct map *map,
703 struct xseg_request *req)
705 struct peerd *peer = pr->peer;
706 uint64_t i, pos = 0, max_objidx = calc_map_obj(map);
708 int r = xseg_prep_request(peer->xseg, req, map->volumelen,
709 v0_mapheader_size + max_objidx * v0_objectsize_in_map);
711 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
714 char *target = xseg_get_target(peer->xseg, req);
715 strncpy(target, map->volume, req->targetlen);
716 char *data = xseg_get_data(peer->xseg, req);
719 req->size = req->datalen;
722 for (i = 0; i < max_objidx; i++) {
723 mn = find_object(map, i);
725 XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
726 (unsigned long long) i, map->volume);
729 v0_object_to_map(mn, (unsigned char *)(data+pos));
730 pos += v0_objectsize_in_map;
732 XSEGLOG2(&lc, D, "Prepared %llu objects", i);
736 /* static struct map_functions map_functions_v0 = { */
737 /* .read_object = read_object_v0, */
738 /* .read_map = read_map_v0, */
739 /* .prepare_write_object = prepare_write_object_v0, */
740 /* .prepare_write_map = prepare_write_map_v0 */
742 #define map_functions_v0 { \
743 .read_object = read_object_v0, \
744 .read_map = read_map_v0, \
745 .prepare_write_object = prepare_write_object_v0,\
746 .prepare_write_map = prepare_write_map_v0 \
750 /* transparency byte + max object len in disk */
751 #define v1_objectsize_in_map (1 + SHA256_DIGEST_SIZE)
753 /* Map header contains:
757 #define v1_mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
759 static inline int read_object_v1(struct map_node *mn, unsigned char *buf)
764 mn->flags |= MF_OBJECT_EXIST;
765 strcpy(mn->object, MAPPER_PREFIX);
766 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
767 mn->object[MAX_OBJECT_LEN] = 0;
768 mn->objectlen = strlen(mn->object);
771 mn->flags &= ~MF_OBJECT_EXIST;
772 hexlify(buf+1, mn->object);
773 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
774 mn->objectlen = strlen(mn->object);
779 static inline void v1_object_to_map(char* buf, struct map_node *mn)
781 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
783 /* strip common prefix */
784 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
787 unhexlify(mn->object, (unsigned char *)(buf+1));
791 static int prepare_write_object_v1(struct peer_req *pr, struct map *map,
792 struct map_node *mn, struct xseg_request *req)
794 struct peerd *peer = pr->peer;
795 int r = xseg_prep_request(peer->xseg, req, map->volumelen, v1_objectsize_in_map);
797 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
799 mn->object, map->volume, (unsigned long long) mn->objectidx);
802 char *target = xseg_get_target(peer->xseg, req);
803 strncpy(target, map->volume, req->targetlen);
804 req->size = req->datalen;
805 req->offset = v1_mapheader_size + mn->objectidx * v1_objectsize_in_map;
807 char *data = xseg_get_data(pr->peer->xseg, req);
808 v1_object_to_map(data, mn);
812 static int read_map_v1(struct map *m, unsigned char * data)
815 struct map_node *map_node;
821 m->version = *(uint32_t *) (data + pos);
822 pos += sizeof(uint32_t);
823 m->size = *(uint64_t *) (data + pos);
824 pos += sizeof(uint64_t);
827 nr_objs = m->size / block_size;
828 if (m->size % block_size)
830 map_node = calloc(nr_objs, sizeof(struct map_node));
834 for (i = 0; i < nr_objs; i++) {
836 map_node[i].objectidx = i;
837 map_node[i].waiters = 0;
839 map_node[i].cond = st_cond_new(); //FIXME err check;
840 read_object_v1(&map_node[i], data+pos);
841 pos += objectsize_in_map;
842 r = insert_object(m, &map_node[i]); //FIXME error check
847 static int prepare_write_map_v1(struct peer_req *pr, struct map *m,
848 struct xseg_request *req)
850 struct peerd *peer = pr->peer;
851 uint64_t i, pos = 0, max_objidx = calc_map_obj(m);
854 int r = xseg_prep_request(peer->xseg, req, m->volumelen,
855 v1_mapheader_size + max_objidx * v1_objectsize_in_map);
857 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", m->volume);
860 char *target = xseg_get_target(peer->xseg, req);
861 strncpy(target, m->volume, req->targetlen);
862 char *data = xseg_get_data(peer->xseg, req);
864 memcpy(data + pos, &m->version, sizeof(m->version));
865 pos += sizeof(m->version);
866 memcpy(data + pos, &m->size, sizeof(m->size));
867 pos += sizeof(m->size);
870 req->size = req->datalen;
873 for (i = 0; i < max_objidx; i++) {
874 mn = find_object(m, i);
876 XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
877 (unsigned long long) i, m->volume);
880 v1_object_to_map(data+pos, mn);
881 pos += v1_objectsize_in_map;
886 /* static struct map_functions map_functions_v1 = { */
887 /* .read_object = read_object_v1, */
888 /* .read_map = read_map_v1, */
889 /* .prepare_write_object = prepare_write_object_v1, */
890 /* .prepare_write_map = prepare_write_map_v1 */
892 #define map_functions_v1 { \
893 .read_object = read_object_v1, \
894 .read_map = read_map_v1, \
895 .prepare_write_object = prepare_write_object_v1,\
896 .prepare_write_map = prepare_write_map_v1 \
899 static struct map_functions map_functions[] = { map_functions_v0,
901 #define MAP_LATEST_VERSION 1
902 /* end of functions */
908 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
910 hexlify(buf, mn->object);
911 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
912 mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
913 mn->flags = MF_OBJECT_EXIST;
916 static inline void map_to_object(struct map_node *mn, unsigned char *buf)
921 mn->flags |= MF_OBJECT_EXIST;
922 strcpy(mn->object, MAPPER_PREFIX);
923 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
924 mn->object[MAX_OBJECT_LEN] = 0;
925 mn->objectlen = strlen(mn->object);
928 hexlify(buf+1, mn->object);
929 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
930 mn->objectlen = strlen(mn->object);
935 static inline void object_to_map(char* buf, struct map_node *mn)
937 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
939 /* strip common prefix */
940 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
943 unhexlify(mn->object, (unsigned char *)(buf+1));
947 static inline void mapheader_to_map(struct map *m, char *buf)
950 memcpy(buf + pos, &m->version, sizeof(m->version));
951 pos += sizeof(m->version);
952 memcpy(buf + pos, &m->size, sizeof(m->size));
953 pos += sizeof(m->size);
957 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
958 struct map *map, struct map_node *mn)
962 struct mapperd *mapper = __get_mapperd(peer);
963 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
964 mapper->mbportno, X_ALLOC);
966 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
968 mn->object, map->volume, (unsigned long long) mn->objectidx);
972 r = map_functions[map->version].prepare_write_object(pr, map, mn, req);
974 XSEGLOG2(&lc, E, "Cannot prepare write object");
979 r = xseg_set_req_data(peer->xseg, req, pr);
981 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
983 mn->object, map->volume, (unsigned long long) mn->objectidx);
986 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
988 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
990 mn->object, map->volume, (unsigned long long) mn->objectidx);
993 r = xseg_signal(peer->xseg, p);
995 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
997 XSEGLOG2(&lc, I, "Writing object %s \n\t"
999 mn->object, map->volume, (unsigned long long) mn->objectidx);
1004 xseg_get_req_data(peer->xseg, req, &dummy);
1006 xseg_put_request(peer->xseg, req, pr->portno);
1008 XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
1010 mn->object, map->volume, (unsigned long long) mn->objectidx);
1014 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
1018 struct peerd *peer = pr->peer;
1019 struct mapperd *mapper = __get_mapperd(peer);
1020 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1021 mapper->mbportno, X_ALLOC);
1023 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
1027 r = map_functions[map->version].prepare_write_map(pr, map, req);
1029 XSEGLOG2(&lc, E, "Cannot prepare write map");
1035 r = xseg_set_req_data(peer->xseg, req, pr);
1037 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1041 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1043 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1047 r = xseg_signal(peer->xseg, p);
1049 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
1051 map->flags |= MF_MAP_WRITING;
1052 XSEGLOG2(&lc, I, "Writing map %s", map->volume);
1056 xseg_get_req_data(peer->xseg, req, &dummy);
1058 xseg_put_request(peer->xseg, req, pr->portno);
1060 XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
1064 static int write_map(struct peer_req* pr, struct map *map)
1067 struct peerd *peer = pr->peer;
1068 struct xseg_request *req = __write_map(pr, map);
1071 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1072 if (req->state & XS_FAILED)
1074 xseg_put_request(peer->xseg, req, pr->portno);
1075 map->flags &= ~MF_MAP_WRITING;
1079 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
1083 struct xseg_request *req;
1084 struct peerd *peer = pr->peer;
1085 struct mapperd *mapper = __get_mapperd(peer);
1088 XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
1090 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1092 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1097 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1099 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1104 char *reqtarget = xseg_get_target(peer->xseg, req);
1107 strncpy(reqtarget, m->volume, req->targetlen);
1109 req->size = block_size;
1111 r = xseg_set_req_data(peer->xseg, req, pr);
1113 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1117 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1119 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1123 r = xseg_signal(peer->xseg, p);
1125 m->flags |= MF_MAP_LOADING;
1126 XSEGLOG2(&lc, I, "Map %s loading", m->volume);
1130 xseg_get_req_data(peer->xseg, req, &dummy);
1132 xseg_put_request(peer->xseg, req, pr->portno);
1137 static int read_map (struct map *map, unsigned char *buf)
1139 char nulls[SHA256_DIGEST_SIZE];
1140 memset(nulls, 0, SHA256_DIGEST_SIZE);
1142 int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
1144 XSEGLOG2(&lc, E, "Read zeros");
1147 //type 1, archip type, type 0 pithos map
1148 int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1149 XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
1152 version = *(uint32_t *) (buf); //version should always be the first uint32_t
1155 if (version > MAP_LATEST_VERSION){
1156 XSEGLOG2(&lc, E, "Map read for map %s failed. Invalid version %u",
1157 map->volume, version);
1161 r = map_functions[version].read_map(map, buf);
1163 XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
1168 XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
1173 static int load_map(struct peer_req *pr, struct map *map)
1176 struct xseg_request *req;
1177 struct peerd *peer = pr->peer;
1178 req = __load_map(pr, map);
1181 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1182 map->flags &= ~MF_MAP_LOADING;
1183 if (req->state & XS_FAILED){
1184 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
1185 xseg_put_request(peer->xseg, req, pr->portno);
1188 r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
1189 xseg_put_request(peer->xseg, req, pr->portno);
1193 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
1198 struct xseg_request *req;
1199 struct peerd *peer = pr->peer;
1200 struct mapperd *mapper = __get_mapperd(peer);
1203 XSEGLOG2(&lc, I, "Opening map %s", m->volume);
1205 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1207 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1212 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1214 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1219 char *reqtarget = xseg_get_target(peer->xseg, req);
1222 strncpy(reqtarget, m->volume, req->targetlen);
1223 req->op = X_ACQUIRE;
1224 req->size = block_size;
1226 if (!(flags & MF_FORCE))
1227 req->flags = XF_NOSYNC;
1228 r = xseg_set_req_data(peer->xseg, req, pr);
1230 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1234 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1236 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1240 r = xseg_signal(peer->xseg, p);
1242 m->flags |= MF_MAP_OPENING;
1243 XSEGLOG2(&lc, I, "Map %s opening", m->volume);
1247 xseg_get_req_data(peer->xseg, req, &dummy);
1249 xseg_put_request(peer->xseg, req, pr->portno);
1254 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
1257 struct xseg_request *req;
1258 struct peerd *peer = pr->peer;
1260 req = __open_map(pr, map, flags);
1264 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1265 map->flags &= ~MF_MAP_OPENING;
1266 err = req->state & XS_FAILED;
1267 xseg_put_request(peer->xseg, req, pr->portno);
1271 map->flags |= MF_MAP_EXCLUSIVE;
1279 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
1283 XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
1285 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1286 if (r == -XHASH_ERESIZE) {
1287 xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
1288 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1291 mio->copyups_nodes = new_hashmap;
1292 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1295 XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
1299 XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
1301 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1302 if (r == -XHASH_ERESIZE) {
1303 xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
1304 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1307 mio->copyups_nodes = new_hashmap;
1308 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1311 XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
1318 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
1320 struct map_node *mn;
1321 int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
1323 XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
1326 XSEGLOG2(&lc, D, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
1330 static struct xseg_request * __snapshot_object(struct peer_req *pr,
1331 struct map_node *mn)
1333 struct peerd *peer = pr->peer;
1334 struct mapperd *mapper = __get_mapperd(peer);
1335 struct mapper_io *mio = __get_mapper_io(pr);
1336 //struct map *map = mn->map;
1341 //assert mn->volume != zero_block
1342 //assert mn->flags & MF_OBJECT_EXIST
1343 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1344 mapper->bportno, X_ALLOC);
1346 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1349 r = xseg_prep_request(peer->xseg, req, mn->objectlen,
1350 sizeof(struct xseg_request_snapshot));
1352 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1356 char *target = xseg_get_target(peer->xseg, req);
1357 strncpy(target, mn->object, req->targetlen);
1359 struct xseg_request_snapshot *xsnapshot = (struct xseg_request_snapshot *) xseg_get_data(peer->xseg, req);
1360 xsnapshot->target[0] = 0;
1361 xsnapshot->targetlen = 0;
1364 req->size = block_size;
1365 req->op = X_SNAPSHOT;
1366 r = xseg_set_req_data(peer->xseg, req, pr);
1368 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1371 r = __set_copyup_node(mio, req, mn);
1374 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1376 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1377 goto out_mapper_unset;
1379 xseg_signal(peer->xseg, p);
1381 mn->flags |= MF_OBJECT_SNAPSHOTTING;
1382 XSEGLOG2(&lc, I, "Snapshotting up object %s", mn->object);
1386 __set_copyup_node(mio, req, NULL);
1388 xseg_get_req_data(peer->xseg, req, &dummy);
1390 xseg_put_request(peer->xseg, req, pr->portno);
1392 XSEGLOG2(&lc, E, "Snapshotting object %s failed", mn->object);
1396 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
1398 struct mapperd *mapper = __get_mapperd(peer);
1399 struct mapper_io *mio = __get_mapper_io(pr);
1400 struct map *map = mn->map;
1405 uint32_t newtargetlen;
1406 char new_target[MAX_OBJECT_LEN + 1];
1407 unsigned char sha[SHA256_DIGEST_SIZE];
1409 strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1411 char tmp[XSEG_MAX_TARGETLEN + 1];
1413 strncpy(tmp, map->volume, map->volumelen);
1414 sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1415 tmp[XSEG_MAX_TARGETLEN] = 0;
1416 tmplen = strlen(tmp);
1417 XSEGLOG2(&lc, D, "Base for new target: %s (len: %d)", tmp, tmplen);
1418 SHA256((unsigned char *)tmp, tmplen, sha);
1419 hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1420 newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1422 XSEGLOG2(&lc, D, "New target: %.71s (len: %d)", new_target, newtargetlen);
1425 if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1426 goto copyup_zeroblock;
1428 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1429 mapper->bportno, X_ALLOC);
1431 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1434 r = xseg_prep_request(peer->xseg, req, newtargetlen,
1435 sizeof(struct xseg_request_copy));
1437 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1441 char *target = xseg_get_target(peer->xseg, req);
1442 strncpy(target, new_target, req->targetlen);
1444 struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1445 strncpy(xcopy->target, mn->object, mn->objectlen);
1446 xcopy->targetlen = mn->objectlen;
1449 req->size = block_size;
1451 r = xseg_set_req_data(peer->xseg, req, pr);
1453 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1456 r = __set_copyup_node(mio, req, mn);
1459 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1461 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1462 goto out_mapper_unset;
1464 xseg_signal(peer->xseg, p);
1467 mn->flags |= MF_OBJECT_COPYING;
1468 XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1472 __set_copyup_node(mio, req, NULL);
1474 xseg_get_req_data(peer->xseg, req, &dummy);
1476 xseg_put_request(peer->xseg, req, pr->portno);
1478 XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1482 XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1483 "Proceeding in writing the new object in map");
1484 /* construct a tmp map_node for writing purposes */
1485 struct map_node newmn = *mn;
1486 newmn.flags = MF_OBJECT_EXIST;
1487 strncpy(newmn.object, new_target, newtargetlen);
1488 newmn.object[newtargetlen] = 0;
1489 newmn.objectlen = newtargetlen;
1490 newmn.objectidx = mn->objectidx;
1491 req = object_write(peer, pr, map, &newmn);
1492 r = __set_copyup_node(mio, req, mn);
1496 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1497 "\n\t of map %s [%llu]",
1498 mn->object, map->volume, (unsigned long long) mn->objectidx);
1501 mn->flags |= MF_OBJECT_WRITING;
1502 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1506 static struct xseg_request * __delete_object(struct peer_req *pr, struct map_node *mn)
1509 struct peerd *peer = pr->peer;
1510 struct mapperd *mapper = __get_mapperd(peer);
1511 struct mapper_io *mio = __get_mapper_io(pr);
1512 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1513 mapper->bportno, X_ALLOC);
1514 XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1516 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1519 int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1521 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1524 char *target = xseg_get_target(peer->xseg, req);
1525 strncpy(target, mn->object, req->targetlen);
1527 req->size = req->datalen;
1529 r = xseg_set_req_data(peer->xseg, req, pr);
1531 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1534 r = __set_copyup_node(mio, req, mn);
1537 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1539 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1540 goto out_mapper_unset;
1542 r = xseg_signal(peer->xseg, p);
1543 mn->flags |= MF_OBJECT_DELETING;
1544 XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1548 __set_copyup_node(mio, req, NULL);
1550 xseg_get_req_data(peer->xseg, req, &dummy);
1552 xseg_put_request(peer->xseg, req, pr->portno);
1554 XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1558 static struct xseg_request * __delete_map(struct peer_req *pr, struct map *map)
1561 struct peerd *peer = pr->peer;
1562 struct mapperd *mapper = __get_mapperd(peer);
1563 struct mapper_io *mio = __get_mapper_io(pr);
1564 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1565 mapper->mbportno, X_ALLOC);
1566 XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1568 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1571 int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1573 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1576 char *target = xseg_get_target(peer->xseg, req);
1577 strncpy(target, map->volume, req->targetlen);
1579 req->size = req->datalen;
1581 r = xseg_set_req_data(peer->xseg, req, pr);
1583 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1586 /* do not check return value. just make sure there is no node set */
1587 __set_copyup_node(mio, req, NULL);
1588 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1590 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1593 r = xseg_signal(peer->xseg, p);
1594 map->flags |= MF_MAP_DELETING;
1595 XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1599 xseg_get_req_data(peer->xseg, req, &dummy);
1601 xseg_put_request(peer->xseg, req, pr->portno);
1603 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1608 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1610 struct map_node *mn = find_object(map, index);
1616 static inline void put_mapnode(struct map_node *mn)
1621 st_cond_destroy(mn->cond);
1625 static inline void __get_map(struct map *map)
1630 static inline void put_map(struct map *map)
1632 struct map_node *mn;
1633 XSEGLOG2(&lc, D, "Putting map %lx %s. ref %u", map, map->volume, map->ref);
1636 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1639 for (i = 0; i < calc_map_obj(map); i++) {
1640 mn = get_mapnode(map, i);
1642 //make sure all pending operations on all objects are completed
1643 //this should never happen...
1644 if (mn->flags & MF_OBJECT_NOT_READY)
1645 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1646 mn->flags |= MF_OBJECT_DESTROYED;
1647 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1648 put_mapnode(mn); //matcing get_mapnode;
1649 //assert mn->ref == 0;
1652 mn = find_object(map, 0);
1655 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1660 static struct map * create_map(struct mapperd *mapper, char *name,
1661 uint32_t namelen, uint32_t flags)
1664 if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1665 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1666 namelen, MAX_VOLUME_LEN);
1669 struct map *m = malloc(sizeof(struct map));
1671 XSEGLOG2(&lc, E, "Cannot allocate map ");
1675 if (flags & MF_ARCHIP){
1676 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1677 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1678 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1679 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1680 m->version = 1; /* keep this hardcoded for now */
1683 strncpy(m->volume, name, namelen);
1684 m->volume[namelen] = 0;
1685 m->volumelen = namelen;
1686 m->version = 0; /* version 0 should be pithos maps */
1689 m->objects = xhash_new(3, INTEGER);
1691 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1697 m->cond = st_cond_new(); //FIXME err check;
1698 r = insert_map(mapper, m);
1700 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1707 xhash_free(m->objects);
1709 XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1717 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1719 struct peerd *peer = pr->peer;
1720 struct mapperd *mapper = __get_mapperd(peer);
1722 struct mapper_io *mio = __get_mapper_io(pr);
1723 struct map_node *mn = __get_copyup_node(mio, req);
1725 __set_copyup_node(mio, req, NULL);
1727 //assert req->op = X_DELETE;
1728 //assert pr->req->op = X_DELETE only map deletions make delete requests
1729 //assert mio->del_pending > 0
1730 XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
1733 if (req->state & XS_FAILED){
1737 XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
1738 mn, mn->object, mio, req);
1739 // assert mn->flags & MF_OBJECT_DELETING
1740 mn->flags &= ~MF_OBJECT_DELETING;
1741 mn->flags |= MF_OBJECT_DESTROYED;
1743 /* put mapnode here, matches get_mapnode on do_destroy */
1746 XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
1749 xseg_put_request(peer->xseg, req, pr->portno);
1753 void snapshot_cb(struct peer_req *pr, struct xseg_request *req)
1755 struct peerd *peer = pr->peer;
1756 struct mapperd *mapper = __get_mapperd(peer);
1758 struct mapper_io *mio = __get_mapper_io(pr);
1759 struct map_node *mn = __get_copyup_node(mio, req);
1761 XSEGLOG2(&lc, E, "Cannot get map node");
1764 __set_copyup_node(mio, req, NULL);
1766 if (req->state & XS_FAILED){
1767 if (req->op == X_DELETE){
1768 XSEGLOG2(&lc, E, "Delete req failed");
1771 XSEGLOG2(&lc, E, "Req failed");
1772 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1773 mn->flags &= ~MF_OBJECT_WRITING;
1777 if (req->op == X_WRITE) {
1778 char old_object_name[MAX_OBJECT_LEN + 1];
1779 uint32_t old_objectlen;
1781 char *target = xseg_get_target(peer->xseg, req);
1783 //assert mn->flags & MF_OBJECT_WRITING
1784 mn->flags &= ~MF_OBJECT_WRITING;
1785 strncpy(old_object_name, mn->object, mn->objectlen);
1786 old_objectlen = mn->objectlen;
1788 struct map_node tmp;
1789 char *data = xseg_get_data(peer->xseg, req);
1790 map_to_object(&tmp, (unsigned char *) data);
1791 mn->flags &= ~MF_OBJECT_EXIST;
1793 strncpy(mn->object, tmp.object, tmp.objectlen);
1794 mn->object[tmp.objectlen] = 0;
1795 mn->objectlen = tmp.objectlen;
1796 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1797 //signal_mapnode since Snapshot was successfull
1800 //do delete old object
1801 strncpy(tmp.object, old_object_name, old_objectlen);
1802 tmp.object[old_objectlen] = 0;
1803 tmp.objectlen = old_objectlen;
1804 tmp.flags = MF_OBJECT_EXIST;
1805 struct xseg_request *xreq = __delete_object(pr, &tmp);
1807 //just a warning. Snapshot was successfull
1808 XSEGLOG2(&lc, W, "Cannot delete old object %s", tmp.object);
1811 //overwrite copyup node, since tmp is a stack dummy variable
1812 __set_copyup_node (mio, xreq, mn);
1813 XSEGLOG2(&lc, I, "Deletion of %s pending", tmp.object);
1814 } else if (req->op == X_SNAPSHOT) {
1815 //issue write_object;
1816 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1817 struct map *map = mn->map;
1819 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1823 /* construct a tmp map_node for writing purposes */
1824 //char *target = xseg_get_target(peer->xseg, req);
1825 struct map_node newmn = *mn;
1827 struct xseg_reply_snapshot *xreply;
1828 xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, req);
1829 //assert xreply->targetlen !=0
1830 //assert xreply->targetlen < XSEG_MAX_TARGETLEN
1831 //xreply->target[xreply->targetlen] = 0;
1832 //assert xreply->target valid
1833 strncpy(newmn.object, xreply->target, xreply->targetlen);
1834 newmn.object[req->targetlen] = 0;
1835 newmn.objectlen = req->targetlen;
1836 newmn.objectidx = mn->objectidx;
1837 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1839 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1840 "\n\t of map %s [%llu]",
1841 mn->object, map->volume, (unsigned long long) mn->objectidx);
1844 mn->flags |= MF_OBJECT_WRITING;
1845 __set_copyup_node (mio, xreq, mn);
1847 XSEGLOG2(&lc, I, "Object %s snapshot completed. Pending writing.", mn->object);
1848 } else if (req->op == X_DELETE){
1849 //deletion of the old block completed
1850 XSEGLOG2(&lc, I, "Deletion of completed");
1859 xseg_put_request(peer->xseg, req, pr->portno);
1863 mio->snap_pending--;
1864 XSEGLOG2(&lc, D, "Mio->snap_pending: %u", mio->snap_pending);
1872 mio->snap_pending--;
1878 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1880 struct peerd *peer = pr->peer;
1881 struct mapperd *mapper = __get_mapperd(peer);
1883 struct mapper_io *mio = __get_mapper_io(pr);
1884 struct map_node *mn = __get_copyup_node(mio, req);
1886 XSEGLOG2(&lc, E, "Cannot get map node");
1889 __set_copyup_node(mio, req, NULL);
1891 if (req->state & XS_FAILED){
1892 XSEGLOG2(&lc, E, "Req failed");
1893 mn->flags &= ~MF_OBJECT_COPYING;
1894 mn->flags &= ~MF_OBJECT_WRITING;
1897 if (req->op == X_WRITE) {
1898 char *target = xseg_get_target(peer->xseg, req);
1900 //printf("handle object write replyi\n");
1901 __set_copyup_node(mio, req, NULL);
1902 //assert mn->flags & MF_OBJECT_WRITING
1903 mn->flags &= ~MF_OBJECT_WRITING;
1905 struct map_node tmp;
1906 char *data = xseg_get_data(peer->xseg, req);
1907 map_to_object(&tmp, (unsigned char *) data);
1908 mn->flags |= MF_OBJECT_EXIST;
1909 if (mn->flags != MF_OBJECT_EXIST){
1910 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1913 //assert mn->flags & MF_OBJECT_EXIST
1914 strncpy(mn->object, tmp.object, tmp.objectlen);
1915 mn->object[tmp.objectlen] = 0;
1916 mn->objectlen = tmp.objectlen;
1917 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1921 } else if (req->op == X_COPY) {
1922 // issue write_object;
1923 mn->flags &= ~MF_OBJECT_COPYING;
1924 struct map *map = mn->map;
1926 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1930 /* construct a tmp map_node for writing purposes */
1931 char *target = xseg_get_target(peer->xseg, req);
1932 struct map_node newmn = *mn;
1933 newmn.flags = MF_OBJECT_EXIST;
1934 strncpy(newmn.object, target, req->targetlen);
1935 newmn.object[req->targetlen] = 0;
1936 newmn.objectlen = req->targetlen;
1937 newmn.objectidx = mn->objectidx;
1938 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1940 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1941 "\n\t of map %s [%llu]",
1942 mn->object, map->volume, (unsigned long long) mn->objectidx);
1945 mn->flags |= MF_OBJECT_WRITING;
1946 __set_copyup_node (mio, xreq, mn);
1948 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1955 xseg_put_request(peer->xseg, req, pr->portno);
1960 XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1970 struct map_node *mn;
1975 static int req2objs(struct peer_req *pr, struct map *map, int write)
1978 struct peerd *peer = pr->peer;
1979 struct mapper_io *mio = __get_mapper_io(pr);
1980 char *target = xseg_get_target(peer->xseg, pr->req);
1981 uint32_t nr_objs = calc_nr_obj(pr->req);
1982 uint64_t size = sizeof(struct xseg_reply_map) +
1983 nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1985 uint64_t rem_size, obj_index, obj_offset, obj_size;
1986 struct map_node *mn;
1988 XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1990 /* get map_nodes of request */
1991 struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1993 XSEGLOG2(&lc, E, "Cannot allocate mns");
1997 rem_size = pr->req->size;
1998 obj_index = pr->req->offset / block_size;
1999 obj_offset = pr->req->offset & (block_size -1); //modulo
2000 obj_size = (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
2001 mn = get_mapnode(map, obj_index);
2003 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
2008 mns[idx].offset = obj_offset;
2009 mns[idx].size = obj_size;
2010 rem_size -= obj_size;
2011 while (rem_size > 0) {
2015 obj_size = (rem_size > block_size) ? block_size : rem_size;
2016 rem_size -= obj_size;
2017 mn = get_mapnode(map, obj_index);
2019 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
2024 mns[idx].offset = obj_offset;
2025 mns[idx].size = obj_size;
2030 /* do a first scan and issue as many copyups as we can.
2031 * then retry and wait when an object is not ready.
2032 * this could be done better, since now we wait also on the
2036 for (j = 0; j < 2 && !mio->err; j++) {
2037 for (i = 0; i < (idx+1); i++) {
2040 if (mn->flags & MF_OBJECT_NOT_READY){
2043 if (mn->flags & MF_OBJECT_NOT_READY)
2044 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2045 if (mn->flags & MF_OBJECT_DESTROYED){
2051 if (!(mn->flags & MF_OBJECT_EXIST)) {
2052 //calc new_target, copy up object
2053 if (copyup_object(peer, mn, pr) == NULL){
2054 XSEGLOG2(&lc, E, "Error in copy up object");
2062 XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
2068 if (mio->copyups > 0)
2069 wait_on_pr(pr, mio->copyups > 0);
2074 XSEGLOG2(&lc, E, "Mio->err");
2078 /* resize request to fit reply */
2079 char buf[XSEG_MAX_TARGETLEN];
2080 strncpy(buf, target, pr->req->targetlen);
2081 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
2083 XSEGLOG2(&lc, E, "Cannot resize request");
2086 target = xseg_get_target(peer->xseg, pr->req);
2087 strncpy(target, buf, pr->req->targetlen);
2089 /* structure reply */
2090 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2091 reply->cnt = nr_objs;
2092 for (i = 0; i < (idx+1); i++) {
2093 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
2094 reply->segs[i].targetlen = mns[i].mn->objectlen;
2095 reply->segs[i].offset = mns[i].offset;
2096 reply->segs[i].size = mns[i].size;
2099 for (i = 0; i < idx; i++) {
2100 put_mapnode(mns[i].mn);
2107 static int do_dropcache(struct peer_req *pr, struct map *map)
2109 struct map_node *mn;
2110 struct peerd *peer = pr->peer;
2111 struct mapperd *mapper = __get_mapperd(peer);
2113 XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
2114 map->flags |= MF_MAP_DROPPING_CACHE;
2115 for (i = 0; i < calc_map_obj(map); i++) {
2116 mn = get_mapnode(map, i);
2118 if (!(mn->flags & MF_OBJECT_DESTROYED)){
2119 //make sure all pending operations on all objects are completed
2120 if (mn->flags & MF_OBJECT_NOT_READY)
2121 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2122 mn->flags |= MF_OBJECT_DESTROYED;
2127 map->flags &= ~MF_MAP_DROPPING_CACHE;
2128 map->flags |= MF_MAP_DESTROYED;
2129 remove_map(mapper, map);
2130 XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
2131 put_map(map); // put map here to destroy it (matches m->ref = 1 on map create)
2135 static int do_info(struct peer_req *pr, struct map *map)
2137 struct peerd *peer = pr->peer;
2138 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
2139 xinfo->size = map->size;
2144 static int do_open(struct peer_req *pr, struct map *map)
2146 if (map->flags & MF_MAP_EXCLUSIVE){
2154 static int do_close(struct peer_req *pr, struct map *map)
2156 if (map->flags & MF_MAP_EXCLUSIVE){
2157 /* do not drop cache if close failed and map not deleted */
2158 if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
2161 return do_dropcache(pr, map);
2164 static int do_snapshot(struct peer_req *pr, struct map *map)
2167 struct peerd *peer = pr->peer;
2168 struct mapper_io *mio = __get_mapper_io(pr);
2169 struct map_node *mn;
2170 struct xseg_request *req;
2172 if (!(map->flags & MF_MAP_EXCLUSIVE)){
2173 XSEGLOG2(&lc, E, "Map was not opened exclusively");
2176 XSEGLOG2(&lc, I, "Starting snapshot for map %s", map->volume);
2177 map->flags |= MF_MAP_SNAPSHOTTING;
2179 uint64_t nr_obj = calc_map_obj(map);
2180 mio->cb = snapshot_cb;
2181 mio->snap_pending = 0;
2183 for (i = 0; i < nr_obj; i++){
2185 /* throttle pending snapshots
2186 * this should be nr_ops of the blocker, but since we don't know
2187 * that, we assume based on our own nr_ops
2189 if (mio->snap_pending >= peer->nr_ops)
2190 wait_on_pr(pr, mio->snap_pending >= peer->nr_ops);
2192 mn = get_mapnode(map, i);
2196 if (!(mn->flags & MF_OBJECT_EXIST)){
2200 // make sure all pending operations on all objects are completed
2201 if (mn->flags & MF_OBJECT_NOT_READY)
2202 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2204 /* TODO will this ever happen?? */
2205 if (mn->flags & MF_OBJECT_DESTROYED){
2210 req = __snapshot_object(pr, mn);
2216 mio->snap_pending++;
2217 /* do not put_mapnode here. cb does that */
2220 if (mio->snap_pending > 0)
2221 wait_on_pr(pr, mio->snap_pending > 0);
2227 /* calculate name of snapshot */
2228 struct map tmp_map = *map;
2229 unsigned char sha[SHA256_DIGEST_SIZE];
2230 unsigned char *buf = malloc(block_size);
2231 char newvolumename[MAX_VOLUME_LEN];
2232 uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
2234 uint64_t max_objidx = calc_map_obj(map);
2237 for (i = 0; i < max_objidx; i++) {
2238 mn = find_object(map, i);
2240 XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
2241 (unsigned long long) i, map->volume);
2244 v0_object_to_map(mn, buf+pos);
2245 pos += v0_objectsize_in_map;
2247 // SHA256(buf, pos, sha);
2248 merkle_hash(buf, pos, sha);
2249 hexlify(sha, newvolumename);
2250 strncpy(tmp_map.volume, newvolumename, newvolumenamelen);
2251 tmp_map.volumelen = newvolumenamelen;
2253 tmp_map.version = 0; // set volume version to pithos image
2255 /* write the map of the Snapshot */
2256 r = write_map(pr, &tmp_map);
2259 char targetbuf[XSEG_MAX_TARGETLEN];
2260 char *target = xseg_get_target(peer->xseg, pr->req);
2261 strncpy(targetbuf, target, pr->req->targetlen);
2262 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
2263 sizeof(struct xseg_reply_snapshot));
2265 XSEGLOG2(&lc, E, "Cannot resize request");
2268 target = xseg_get_target(peer->xseg, pr->req);
2269 strncpy(target, targetbuf, pr->req->targetlen);
2271 struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *)
2272 xseg_get_data(peer->xseg, pr->req);
2273 strncpy(xreply->target, newvolumename, newvolumenamelen);
2274 xreply->targetlen = newvolumenamelen;
2275 map->flags &= ~MF_MAP_SNAPSHOTTING;
2276 XSEGLOG2(&lc, I, "Snapshot for map %s completed", map->volume);
2280 map->flags &= ~MF_MAP_SNAPSHOTTING;
2281 XSEGLOG2(&lc, E, "Snapshot for map %s failed", map->volume);
2286 static int do_destroy(struct peer_req *pr, struct map *map)
2289 struct peerd *peer = pr->peer;
2290 struct mapper_io *mio = __get_mapper_io(pr);
2291 struct map_node *mn;
2292 struct xseg_request *req;
2294 if (!(map->flags & MF_MAP_EXCLUSIVE))
2297 XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
2298 req = __delete_map(pr, map);
2301 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
2302 if (req->state & XS_FAILED){
2303 xseg_put_request(peer->xseg, req, pr->portno);
2304 map->flags &= ~MF_MAP_DELETING;
2307 xseg_put_request(peer->xseg, req, pr->portno);
2309 uint64_t nr_obj = calc_map_obj(map);
2310 mio->cb = deletion_cb;
2311 mio->del_pending = 0;
2313 for (i = 0; i < nr_obj; i++){
2315 /* throttle pending deletions
2316 * this should be nr_ops of the blocker, but since we don't know
2317 * that, we assume based on our own nr_ops
2319 if (mio->del_pending >= peer->nr_ops)
2320 wait_on_pr(pr, mio->del_pending >= peer->nr_ops);
2322 mn = get_mapnode(map, i);
2325 if (mn->flags & MF_OBJECT_DESTROYED){
2329 if (!(mn->flags & MF_OBJECT_EXIST)){
2330 mn->flags |= MF_OBJECT_DESTROYED;
2335 // make sure all pending operations on all objects are completed
2336 if (mn->flags & MF_OBJECT_NOT_READY)
2337 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2339 req = __delete_object(pr, mn);
2346 /* do not put_mapnode here. cb does that */
2349 if (mio->del_pending > 0)
2350 wait_on_pr(pr, mio->del_pending > 0);
2353 map->flags &= ~MF_MAP_DELETING;
2354 map->flags |= MF_MAP_DELETED;
2355 XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
2356 return do_close(pr, map);
2359 static int do_mapr(struct peer_req *pr, struct map *map)
2361 struct peerd *peer = pr->peer;
2362 int r = req2objs(pr, map, 0);
2364 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
2366 (unsigned long long) pr->req->offset,
2367 (unsigned long long) (pr->req->offset + pr->req->size));
2370 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
2372 (unsigned long long) pr->req->offset,
2373 (unsigned long long) (pr->req->offset + pr->req->size));
2374 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2375 (unsigned long long) pr->req->offset,
2376 (unsigned long long) pr->req->size);
2377 char buf[XSEG_MAX_TARGETLEN+1];
2378 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2380 for (i = 0; i < reply->cnt; i++) {
2381 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2382 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2383 buf[reply->segs[i].targetlen] = 0;
2384 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2385 (unsigned long long) reply->segs[i].offset,
2386 (unsigned long long) reply->segs[i].size);
2391 static int do_mapw(struct peer_req *pr, struct map *map)
2393 struct peerd *peer = pr->peer;
2394 int r = req2objs(pr, map, 1);
2396 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
2398 (unsigned long long) pr->req->offset,
2399 (unsigned long long) (pr->req->offset + pr->req->size));
2402 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
2404 (unsigned long long) pr->req->offset,
2405 (unsigned long long) (pr->req->offset + pr->req->size));
2406 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2407 (unsigned long long) pr->req->offset,
2408 (unsigned long long) pr->req->size);
2409 char buf[XSEG_MAX_TARGETLEN+1];
2410 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2412 for (i = 0; i < reply->cnt; i++) {
2413 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2414 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2415 buf[reply->segs[i].targetlen] = 0;
2416 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2417 (unsigned long long) reply->segs[i].offset,
2418 (unsigned long long) reply->segs[i].size);
2423 //here map is the parent map
2424 static int do_clone(struct peer_req *pr, struct map *map)
2427 struct peerd *peer = pr->peer;
2428 struct mapperd *mapper = __get_mapperd(peer);
2429 char *target = xseg_get_target(peer->xseg, pr->req);
2430 struct map *clonemap;
2431 struct xseg_request_clone *xclone =
2432 (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2434 XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
2436 clonemap = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2440 /* open map to get exclusive access to map */
2441 r = open_map(pr, clonemap, 0);
2443 XSEGLOG2(&lc, E, "Cannot open map %s", clonemap->volume);
2444 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2447 r = load_map(pr, clonemap);
2449 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2453 if (xclone->size == -1)
2454 clonemap->size = map->size;
2456 clonemap->size = xclone->size;
2457 if (clonemap->size < map->size){
2458 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
2459 "\n\t for requested clone %s",
2460 (unsigned long long) xclone->size,
2461 (unsigned long long) map->size, clonemap->volume);
2464 if (clonemap->size > MAX_VOLUME_SIZE) {
2465 XSEGLOG2(&lc, E, "Requested size %llu > max volume size %llu"
2466 "\n\t for volume %s",
2467 clonemap->size, MAX_VOLUME_SIZE, clonemap->volume);
2471 //alloc and init map_nodes
2472 //unsigned long c = clonemap->size/block_size + 1;
2473 unsigned long c = calc_map_obj(clonemap);
2474 struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
2479 //for (i = 0; i < clonemap->size/block_size + 1; i++) {
2480 for (i = 0; i < c; i++) {
2481 struct map_node *mn = get_mapnode(map, i);
2483 strncpy(map_nodes[i].object, mn->object, mn->objectlen);
2484 map_nodes[i].objectlen = mn->objectlen;
2487 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2488 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2490 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2491 map_nodes[i].flags = 0;
2492 map_nodes[i].objectidx = i;
2493 map_nodes[i].map = clonemap;
2494 map_nodes[i].ref = 1;
2495 map_nodes[i].waiters = 0;
2496 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2497 r = insert_object(clonemap, &map_nodes[i]);
2499 XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
2504 r = write_map(pr, clonemap);
2506 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
2509 do_close(pr, clonemap);
2513 do_close(pr, clonemap);
2517 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
2520 if (flags & MF_EXCLUSIVE){
2521 r = open_map(pr, map, flags);
2523 if (flags & MF_FORCE){
2530 r = load_map(pr, map);
2531 if (r < 0 && opened){
2537 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
2541 struct peerd *peer = pr->peer;
2542 struct mapperd *mapper = __get_mapperd(peer);
2543 struct map *map = find_map_len(mapper, name, namelen, flags);
2545 if (flags & MF_LOAD){
2546 map = create_map(mapper, name, namelen, flags);
2550 r = open_load_map(pr, map, flags);
2552 do_dropcache(pr, map);
2553 /* signal map here, so any other threads that
2554 * tried to get the map, but couldn't because
2555 * of the opening or loading operation that
2556 * failed, can continue.
2566 } else if (map->flags & MF_MAP_DESTROYED){
2575 static int map_action(int (action)(struct peer_req *pr, struct map *map),
2576 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
2578 //struct peerd *peer = pr->peer;
2581 map = get_map(pr, name, namelen, flags);
2584 if (map->flags & MF_MAP_NOT_READY){
2585 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
2589 int r = action(pr, map);
2590 //always drop cache if map not read exclusively
2591 if (!(map->flags & MF_MAP_EXCLUSIVE))
2592 do_dropcache(pr, map);
2598 void * handle_info(struct peer_req *pr)
2600 struct peerd *peer = pr->peer;
2601 char *target = xseg_get_target(peer->xseg, pr->req);
2602 int r = map_action(do_info, pr, target, pr->req->targetlen,
2612 void * handle_clone(struct peer_req *pr)
2615 struct peerd *peer = pr->peer;
2616 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2622 if (xclone->targetlen){
2623 /* if snap was defined */
2624 //support clone only from pithos
2625 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
2628 /* else try to create a new volume */
2629 XSEGLOG2(&lc, I, "Creating volume");
2631 XSEGLOG2(&lc, E, "Cannot create volume. Size not specified");
2635 if (xclone->size > MAX_VOLUME_SIZE) {
2636 XSEGLOG2(&lc, E, "Requested size %llu > max volume "
2637 "size %llu", xclone->size, MAX_VOLUME_SIZE);
2643 char *target = xseg_get_target(peer->xseg, pr->req);
2645 //create a new empty map of size
2646 map = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2651 /* open map to get exclusive access to map */
2652 r = open_map(pr, map, 0);
2654 XSEGLOG2(&lc, E, "Cannot open map %s", map->volume);
2655 XSEGLOG2(&lc, E, "Target volume %s exists", map->volume);
2656 do_dropcache(pr, map);
2660 r = load_map(pr, map);
2662 XSEGLOG2(&lc, E, "Map exists %s", map->volume);
2667 map->size = xclone->size;
2668 //populate_map with zero objects;
2669 uint64_t nr_objs = xclone->size / block_size;
2670 if (xclone->size % block_size)
2673 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
2681 for (i = 0; i < nr_objs; i++) {
2682 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2683 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2684 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2685 map_nodes[i].flags = 0;
2686 map_nodes[i].objectidx = i;
2687 map_nodes[i].map = map;
2688 map_nodes[i].ref = 1;
2689 map_nodes[i].waiters = 0;
2690 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2691 r = insert_object(map, &map_nodes[i]);
2698 r = write_map(pr, map);
2700 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
2704 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
2706 do_close(pr, map); //drop cache here for consistency
2717 void * handle_mapr(struct peer_req *pr)
2719 struct peerd *peer = pr->peer;
2720 char *target = xseg_get_target(peer->xseg, pr->req);
2721 int r = map_action(do_mapr, pr, target, pr->req->targetlen,
2722 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2731 void * handle_mapw(struct peer_req *pr)
2733 struct peerd *peer = pr->peer;
2734 char *target = xseg_get_target(peer->xseg, pr->req);
2735 int r = map_action(do_mapw, pr, target, pr->req->targetlen,
2736 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2741 XSEGLOG2(&lc, D, "Ta: %d", ta);
2746 void * handle_destroy(struct peer_req *pr)
2748 struct peerd *peer = pr->peer;
2749 char *target = xseg_get_target(peer->xseg, pr->req);
2750 /* request EXCLUSIVE access, but do not force it.
2751 * check if succeeded on do_destroy
2753 int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2754 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2763 void * handle_open(struct peer_req *pr)
2765 struct peerd *peer = pr->peer;
2766 char *target = xseg_get_target(peer->xseg, pr->req);
2767 //here we do not want to load
2768 int r = map_action(do_open, pr, target, pr->req->targetlen,
2769 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2778 void * handle_close(struct peer_req *pr)
2780 struct peerd *peer = pr->peer;
2781 char *target = xseg_get_target(peer->xseg, pr->req);
2782 //here we do not want to load
2783 int r = map_action(do_close, pr, target, pr->req->targetlen,
2784 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2793 void * handle_snapshot(struct peer_req *pr)
2795 struct peerd *peer = pr->peer;
2796 char *target = xseg_get_target(peer->xseg, pr->req);
2797 /* request EXCLUSIVE access, but do not force it.
2798 * check if succeeded on do_snapshot
2800 int r = map_action(do_snapshot, pr, target, pr->req->targetlen,
2801 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2810 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
2811 struct xseg_request *req)
2813 //struct mapperd *mapper = __get_mapperd(peer);
2814 struct mapper_io *mio = __get_mapper_io(pr);
2815 void *(*action)(struct peer_req *) = NULL;
2817 mio->state = ACCEPTED;
2820 switch (pr->req->op) {
2821 /* primary xseg operations of mapper */
2822 case X_CLONE: action = handle_clone; break;
2823 case X_MAPR: action = handle_mapr; break;
2824 case X_MAPW: action = handle_mapw; break;
2825 case X_SNAPSHOT: action = handle_snapshot; break;
2826 case X_INFO: action = handle_info; break;
2827 case X_DELETE: action = handle_destroy; break;
2828 case X_OPEN: action = handle_open; break;
2829 case X_CLOSE: action = handle_close; break;
2830 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2835 st_thread_create(action, pr, 0, 0);
2841 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2842 enum dispatch_reason reason)
2844 struct mapperd *mapper = __get_mapperd(peer);
2846 struct mapper_io *mio = __get_mapper_io(pr);
2850 if (reason == dispatch_accept)
2851 dispatch_accepted(peer, pr, req);
2862 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2866 //FIXME error checks
2867 struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2868 peer->priv = mapperd;
2870 mapper->hashmaps = xhash_new(3, STRING);
2872 for (i = 0; i < peer->nr_ops; i++) {
2873 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2874 mio->copyups_nodes = xhash_new(3, INTEGER);
2878 peer->peer_reqs[i].priv = mio;
2881 mapper->bportno = -1;
2882 mapper->mbportno = -1;
2883 BEGIN_READ_ARGS(argc, argv);
2884 READ_ARG_ULONG("-bp", mapper->bportno);
2885 READ_ARG_ULONG("-mbp", mapper->mbportno);
2887 if (mapper->bportno == -1){
2888 XSEGLOG2(&lc, E, "Portno for blocker must be provided");
2892 if (mapper->mbportno == -1){
2893 XSEGLOG2(&lc, E, "Portno for mblocker must be provided");
2898 const struct sched_param param = { .sched_priority = 99 };
2899 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
2900 /* FIXME maybe place it in peer
2901 * should be done for each port (sportno to eportno)
2903 xseg_set_max_requests(peer->xseg, peer->portno_start, 5000);
2904 xseg_set_freequeue_size(peer->xseg, peer->portno_start, 3000, 0);
2912 /* FIXME this should not be here */
2913 int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
2915 struct xseg *xseg = peer->xseg;
2916 xport portno_start = peer->portno_start;
2917 xport portno_end = peer->portno_end;
2918 struct peer_req *pr;
2921 struct xseg_request *received;
2922 xseg_prepare_wait(xseg, portno_start);
2924 XSEGLOG2(&lc, D, "Attempting to check for reply");
2928 for (i = portno_start; i <= portno_end; i++) {
2929 received = xseg_receive(xseg, i, 0);
2932 r = xseg_get_req_data(xseg, received, (void **) &pr);
2933 if (r < 0 || !pr || received != expected_req){
2934 XSEGLOG2(&lc, W, "Received request with no pr data\n");
2935 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
2937 XSEGLOG2(&lc, W, "Could not respond stale request");
2938 xseg_put_request(xseg, received, portno_start);
2941 xseg_signal(xseg, p);
2944 xseg_cancel_wait(xseg, portno_start);
2950 xseg_wait_signal(xseg, 1000000UL);
2955 void custom_peer_finalize(struct peerd *peer)
2957 struct mapperd *mapper = __get_mapperd(peer);
2958 struct peer_req *pr = alloc_peer_req(peer);
2960 XSEGLOG2(&lc, E, "Cannot get peer request");
2964 struct xseg_request *req;
2967 xhash_iter_init(mapper->hashmaps, &it);
2968 while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
2969 map = (struct map *)val;
2970 if (!(map->flags & MF_MAP_EXCLUSIVE))
2972 req = __close_map(pr, map);
2975 wait_reply(peer, req);
2976 if (!(req->state & XS_SERVED))
2977 XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
2978 map->flags &= ~MF_MAP_CLOSING;
2979 xseg_put_request(peer->xseg, req, pr->portno);
2986 void print_obj(struct map_node *mn)
2988 fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n",
2989 (unsigned long long) mn->objectidx, mn->object,
2990 (unsigned int) mn->objectlen,
2991 (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2994 void print_map(struct map *m)
2996 uint64_t nr_objs = m->size/block_size;
2997 if (m->size % block_size)
2999 fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n",
3000 m->volume, m->volumelen,
3001 (unsigned long long) m->size,
3002 (unsigned long long) nr_objs,
3005 struct map_node *mn;
3006 if (nr_objs > 1000000) //FIXME to protect against invalid volume size
3008 for (i = 0; i < nr_objs; i++) {
3009 mn = find_object(m, i);
3011 printf("object idx [%llu] not found!\n", (unsigned long long) i);
3019 void test_map(struct peerd *peer)
3022 //struct sha256_ctx sha256ctx;
3023 unsigned char buf[SHA256_DIGEST_SIZE];
3024 char buf_new[XSEG_MAX_TARGETLEN + 20];
3025 struct map *m = malloc(sizeof(struct map));
3026 strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
3027 m->volume[XSEG_MAX_TARGETLEN] = 0;
3028 strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
3029 buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
3030 m->volumelen = XSEG_MAX_TARGETLEN;
3031 m->size = 100*block_size;
3032 m->objects = xhash_new(3, INTEGER);
3033 struct map_node *map_node = calloc(100, sizeof(struct map_node));
3034 for (i = 0; i < 100; i++) {
3035 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
3036 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
3038 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
3039 sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
3041 map_node[i].objectidx = i;
3042 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
3043 map_node[i].flags = MF_OBJECT_EXIST;
3044 ret = insert_object(m, &map_node[i]);
3047 char *data = malloc(block_size);
3048 mapheader_to_map(m, data);
3049 uint64_t pos = mapheader_size;
3051 for (i = 0; i < 100; i++) {
3052 map_node = find_object(m, i);
3054 printf("no object node %d \n", i);
3057 object_to_map(data+pos, map_node);
3058 pos += objectsize_in_map;
3062 struct map *m2 = malloc(sizeof(struct map));
3063 strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
3064 m->volume[XSEG_MAX_TARGETLEN] = 0;
3065 m->volumelen = XSEG_MAX_TARGETLEN;
3067 m2->objects = xhash_new(3, INTEGER);
3068 ret = read_map(peer, m2, data);
3071 int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
3073 while (sum < block_size) {
3074 r = write(fd, data + sum, block_size -sum);
3077 printf("write error\n");
3083 map_node = find_object(m, 0);