2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
37 #include <sys/types.h>
39 #include <xseg/xseg.h>
42 #include <xtypes/xlock.h>
43 #include <xtypes/xhash.h>
44 #include <xseg/protocol.h>
49 #include <sys/syscall.h>
50 #include <openssl/sha.h>
53 /* general mapper flags */
54 #define MF_LOAD (1 << 0)
55 #define MF_EXCLUSIVE (1 << 1)
56 #define MF_FORCE (1 << 2)
57 #define MF_ARCHIP (1 << 3)
59 #ifndef SHA256_DIGEST_SIZE
60 #define SHA256_DIGEST_SIZE 32
62 /* hex representation of sha256 value takes up double the sha256 size */
63 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
65 #define block_size (1<<22) //FIXME this should be defined here?
67 /* transparency byte + max object len in disk */
68 #define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
70 /* Map header contains:
74 #define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
77 #define MAPPER_PREFIX "archip_"
78 #define MAPPER_PREFIX_LEN 7
80 #define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
81 #define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
83 #if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
84 #error "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
87 #define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
89 #if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
90 #error "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
93 #define MAX_VOLUME_SIZE \
94 ((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
97 //char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
99 /* pithos considers this a block full of zeros, so should we.
100 * it is actually the sha256 hash of nothing.
102 char *zero_block="e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
103 #define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
105 /* dispatch_internal mapper states */
114 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
117 /* mapper object flags */
118 #define MF_OBJECT_EXIST (1 << 0)
119 #define MF_OBJECT_COPYING (1 << 1)
120 #define MF_OBJECT_WRITING (1 << 2)
121 #define MF_OBJECT_DELETING (1 << 3)
122 #define MF_OBJECT_DESTROYED (1 << 5)
123 #define MF_OBJECT_SNAPSHOTTING (1 << 6)
125 #define MF_OBJECT_NOT_READY (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
126 MF_OBJECT_DELETING|MF_OBJECT_SNAPSHOTTING)
131 char object[MAX_OBJECT_LEN + 1]; /* NULL terminated string */
139 #define wait_on_pr(__pr, __condition__) \
142 __get_mapper_io(pr)->active = 0;\
143 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u", pr, ta); \
144 st_cond_wait(__pr->cond); \
145 } while (__condition__)
147 #define wait_on_mapnode(__mn, __condition__) \
151 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
152 ta: %u", __mn, __mn->object, __mn->waiters, ta); \
153 st_cond_wait(__mn->cond); \
154 } while (__condition__)
156 #define wait_on_map(__map, __condition__) \
160 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
161 __map, __map->volume, __map->waiters, ta); \
162 st_cond_wait(__map->cond); \
163 } while (__condition__)
165 #define signal_pr(__pr) \
167 if (!__get_mapper_io(pr)->active){\
169 XSEGLOG2(&lc, D, "Signaling pr %lx, ta: %u", pr, ta);\
170 __get_mapper_io(pr)->active = 1;\
171 st_cond_signal(__pr->cond); \
175 #define signal_map(__map) \
177 if (__map->waiters) { \
179 XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
180 ta: %u", __map, __map->volume, __map->waiters, ta); \
182 st_cond_signal(__map->cond); \
186 #define signal_mapnode(__mn) \
188 if (__mn->waiters) { \
189 ta += __mn->waiters; \
190 XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
191 %u, ta: %u", __mn, __mn->object, __mn->waiters, ta); \
193 st_cond_broadcast(__mn->cond); \
199 #define MF_MAP_LOADING (1 << 0)
200 #define MF_MAP_DESTROYED (1 << 1)
201 #define MF_MAP_WRITING (1 << 2)
202 #define MF_MAP_DELETING (1 << 3)
203 #define MF_MAP_DROPPING_CACHE (1 << 4)
204 #define MF_MAP_EXCLUSIVE (1 << 5)
205 #define MF_MAP_OPENING (1 << 6)
206 #define MF_MAP_CLOSING (1 << 7)
207 #define MF_MAP_DELETED (1 << 8)
208 #define MF_MAP_SNAPSHOTTING (1 << 9)
210 #define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
211 MF_MAP_DROPPING_CACHE|MF_MAP_OPENING| \
219 char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
220 xhash_t *objects; /* obj_index --> map_node */
227 xport bportno; /* blocker that accesses data */
228 xport mbportno; /* blocker that accesses maps */
229 xhash_t *hashmaps; // hash_function(target) --> struct map
233 volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */
234 xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
235 struct map_node *copyup_node;
236 volatile int err; /* error flag */
237 volatile uint64_t del_pending;
238 volatile uint64_t snap_pending;
242 enum mapper_state state;
247 struct mapperd *mapper;
249 void print_map(struct map *m);
252 void custom_peer_usage()
254 fprintf(stderr, "Custom peer options: \n"
255 "-bp : port for block blocker(!)\n"
256 "-mbp : port for map blocker\n"
265 static inline struct mapperd * __get_mapperd(struct peerd *peer)
267 return (struct mapperd *) peer->priv;
270 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
272 return (struct mapper_io *) pr->priv;
275 static inline uint64_t calc_map_obj(struct map *map)
279 uint64_t nr_objs = map->size / block_size;
280 if (map->size % block_size)
285 static uint32_t calc_nr_obj(struct xseg_request *req)
288 uint64_t rem_size = req->size;
289 uint64_t obj_offset = req->offset & (block_size -1); //modulo
290 uint64_t obj_size = (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
291 rem_size -= obj_size;
292 while (rem_size > 0) {
293 obj_size = (rem_size > block_size) ? block_size : rem_size;
294 rem_size -= obj_size;
302 * Unsafe. Doesn't check if data length is odd!
305 static void hexlify(unsigned char *data, char *hex)
308 for (i=0; i<SHA256_DIGEST_LENGTH; i++)
309 sprintf(hex+2*i, "%02x", data[i]);
312 static void unhexlify(char *hex, unsigned char *data)
316 for (i=0; i<SHA256_DIGEST_LENGTH; i++){
331 data[i] |= (c << 4) & 0xF0;
349 void merkle_hash(unsigned char *hashes, unsigned long len,
350 unsigned char hash[SHA256_DIGEST_SIZE])
352 uint32_t i, l, s = 2;
353 uint32_t nr = len/SHA256_DIGEST_SIZE;
355 unsigned char tmp_hash[SHA256_DIGEST_SIZE];
358 SHA256(hashes, 0, hash);
362 memcpy(hash, hashes, SHA256_DIGEST_SIZE);
367 buf = malloc(sizeof(unsigned char)* SHA256_DIGEST_SIZE * s);
368 memcpy(buf, hashes, nr * SHA256_DIGEST_SIZE);
369 memset(buf + nr * SHA256_DIGEST_SIZE, 0, (s - nr) * SHA256_DIGEST_SIZE);
370 for (l = s; l > 1; l = l/2) {
371 for (i = 0; i < l; i += 2) {
372 SHA256(buf + (i * SHA256_DIGEST_SIZE),
373 2 * SHA256_DIGEST_SIZE, tmp_hash);
374 memcpy(buf + (i/2 * SHA256_DIGEST_SIZE),
375 tmp_hash, SHA256_DIGEST_SIZE);
378 memcpy(hash, buf, SHA256_DIGEST_SIZE);
382 * Maps handling functions
385 static struct map * find_map(struct mapperd *mapper, char *volume)
387 struct map *m = NULL;
388 int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
395 static struct map * find_map_len(struct mapperd *mapper, char *target,
396 uint32_t targetlen, uint32_t flags)
398 char buf[XSEG_MAX_TARGETLEN+1];
399 if (flags & MF_ARCHIP){
400 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
401 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
402 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
403 targetlen += MAPPER_PREFIX_LEN;
406 strncpy(buf, target, targetlen);
410 if (targetlen > MAX_VOLUME_LEN){
411 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
412 targetlen, MAX_VOLUME_LEN);
416 XSEGLOG2(&lc, D, "looking up map %s, len %u",
418 return find_map(mapper, buf);
422 static int insert_map(struct mapperd *mapper, struct map *map)
426 if (find_map(mapper, map->volume)){
427 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
431 XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)",
432 map->volume, strlen(map->volume), (unsigned long) map);
433 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
434 while (r == -XHASH_ERESIZE) {
435 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
436 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
438 XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
439 (unsigned long long) shift);
442 mapper->hashmaps = new_hashmap;
443 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
449 static int remove_map(struct mapperd *mapper, struct map *map)
453 //assert no pending pr on map
455 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
456 while (r == -XHASH_ERESIZE) {
457 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
458 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
460 XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
461 (unsigned long long) shift);
464 mapper->hashmaps = new_hashmap;
465 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
471 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
475 struct peerd *peer = pr->peer;
476 struct xseg_request *req;
477 struct mapperd *mapper = __get_mapperd(peer);
480 XSEGLOG2(&lc, I, "Closing map %s", map->volume);
482 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
484 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
489 r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
491 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
496 char *reqtarget = xseg_get_target(peer->xseg, req);
499 strncpy(reqtarget, map->volume, req->targetlen);
503 r = xseg_set_req_data(peer->xseg, req, pr);
505 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
509 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
511 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
515 r = xseg_signal(peer->xseg, p);
516 map->flags |= MF_MAP_CLOSING;
518 XSEGLOG2(&lc, I, "Map %s closing", map->volume);
522 xseg_get_req_data(peer->xseg, req, &dummy);
524 xseg_put_request(peer->xseg, req, pr->portno);
529 static int close_map(struct peer_req *pr, struct map *map)
532 struct xseg_request *req;
533 struct peerd *peer = pr->peer;
535 req = __close_map(pr, map);
538 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
539 map->flags &= ~MF_MAP_CLOSING;
540 err = req->state & XS_FAILED;
541 xseg_put_request(peer->xseg, req, pr->portno);
548 static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
549 char *target, uint32_t targetlen, struct map **m)
551 struct mapperd *mapper = __get_mapperd(peer);
553 *m = find_map(mapper, target, targetlen);
555 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
556 if ((*m)->flags & MF_MAP_NOT_READY) {
557 __xq_append_tail(&(*m)->pending, (xqindex) pr);
558 XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
560 //} else if ((*m)->flags & MF_MAP_DESTROYED){
564 XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
568 r = open_map(peer, pr, target, targetlen, 0);
575 * Object handling functions
578 struct map_node *find_object(struct map *map, uint64_t obj_index)
581 int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
587 static int insert_object(struct map *map, struct map_node *mn)
589 //FIXME no find object first
590 int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
591 if (r == -XHASH_ERESIZE) {
592 unsigned long shift = xhash_grow_size_shift(map->objects);
593 map->objects = xhash_resize(map->objects, shift, NULL);
596 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
603 * map read/write functions
605 * version 0 -> pithos map
606 * version 1 -> archipelago version 1
610 * int read_object(struct map_node *mn, unsigned char *buf)
611 * int prepare_write_object(struct peer_req *pr, struct map *map,
612 * struct map_node *mn, struct xseg_request *req)
613 * int read_map(struct map *m, unsigned char * data)
614 * int prepare_write_map(struct peer_req *pr, struct map *map,
615 * struct xseg_request *req)
618 struct map_functions {
619 int (*read_object)(struct map_node *mn, unsigned char *buf);
620 int (*prepare_write_object)(struct peer_req *pr, struct map *map,
621 struct map_node *mn, struct xseg_request *req);
622 int (*read_map)(struct map *m, unsigned char * data);
623 int (*prepare_write_map)(struct peer_req *pr, struct map *map,
624 struct xseg_request *req);
627 /* version 0 functions */
630 #define v0_mapheader_size 0
631 /* just the unhexlified name */
632 #define v0_objectsize_in_map SHA256_DIGEST_SIZE
634 static inline int read_object_v0(struct map_node *mn, unsigned char *buf)
636 hexlify(buf, mn->object);
637 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
638 mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
639 mn->flags = MF_OBJECT_EXIST;
644 static void v0_object_to_map(struct map_node *mn, unsigned char *data)
646 unhexlify(mn->object, data);
649 static int prepare_write_object_v0(struct peer_req *pr, struct map *map,
650 struct map_node *mn, struct xseg_request *req)
652 struct peerd *peer = pr->peer;
653 int r = xseg_prep_request(peer->xseg, req, map->volumelen, v0_objectsize_in_map);
655 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
657 mn->object, map->volume, (unsigned long long) mn->objectidx);
660 char *target = xseg_get_target(peer->xseg, req);
661 strncpy(target, map->volume, req->targetlen);
662 req->size = req->datalen;
663 req->offset = v0_mapheader_size + mn->objectidx * v0_objectsize_in_map;
665 unsigned char *data = xseg_get_data(pr->peer->xseg, req);
666 v0_object_to_map(mn, data);
670 static int read_map_v0(struct map *m, unsigned char * data)
673 struct map_node *map_node;
676 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
677 XSEGLOG2(&lc, D, "Max nr_objs %llu", max_nr_objs);
678 char nulls[SHA256_DIGEST_SIZE];
679 memset(nulls, 0, SHA256_DIGEST_SIZE);
680 map_node = calloc(max_nr_objs, sizeof(struct map_node));
683 for (i = 0; i < max_nr_objs; i++) {
684 if (!memcmp(data+pos, nulls, v0_objectsize_in_map))
686 map_node[i].objectidx = i;
688 map_node[i].waiters = 0;
690 map_node[i].cond = st_cond_new(); //FIXME err check;
691 read_object_v0(&map_node[i], data+pos);
692 pos += v0_objectsize_in_map;
693 r = insert_object(m, &map_node[i]); //FIXME error check
695 XSEGLOG2(&lc, D, "Found %llu objects", i);
696 m->size = i * block_size;
700 static int prepare_write_map_v0(struct peer_req *pr, struct map *map,
701 struct xseg_request *req)
703 struct peerd *peer = pr->peer;
704 uint64_t i, pos = 0, max_objidx = calc_map_obj(map);
706 int r = xseg_prep_request(peer->xseg, req, map->volumelen,
707 v0_mapheader_size + max_objidx * v0_objectsize_in_map);
709 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
712 char *target = xseg_get_target(peer->xseg, req);
713 strncpy(target, map->volume, req->targetlen);
714 char *data = xseg_get_data(peer->xseg, req);
717 req->size = req->datalen;
720 for (i = 0; i < max_objidx; i++) {
721 mn = find_object(map, i);
723 XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
724 (unsigned long long) i, map->volume);
727 v0_object_to_map(mn, (unsigned char *)(data+pos));
728 pos += v0_objectsize_in_map;
730 XSEGLOG2(&lc, D, "Prepared %llu objects", i);
734 /* static struct map_functions map_functions_v0 = { */
735 /* .read_object = read_object_v0, */
736 /* .read_map = read_map_v0, */
737 /* .prepare_write_object = prepare_write_object_v0, */
738 /* .prepare_write_map = prepare_write_map_v0 */
740 #define map_functions_v0 { \
741 .read_object = read_object_v0, \
742 .read_map = read_map_v0, \
743 .prepare_write_object = prepare_write_object_v0,\
744 .prepare_write_map = prepare_write_map_v0 \
748 /* transparency byte + max object len in disk */
749 #define v1_objectsize_in_map (1 + SHA256_DIGEST_SIZE)
751 /* Map header contains:
755 #define v1_mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
757 static inline int read_object_v1(struct map_node *mn, unsigned char *buf)
762 mn->flags |= MF_OBJECT_EXIST;
763 strcpy(mn->object, MAPPER_PREFIX);
764 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
765 mn->object[MAX_OBJECT_LEN] = 0;
766 mn->objectlen = strlen(mn->object);
769 mn->flags &= ~MF_OBJECT_EXIST;
770 hexlify(buf+1, mn->object);
771 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
772 mn->objectlen = strlen(mn->object);
777 static inline void v1_object_to_map(char* buf, struct map_node *mn)
779 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
781 /* strip common prefix */
782 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
785 unhexlify(mn->object, (unsigned char *)(buf+1));
789 static int prepare_write_object_v1(struct peer_req *pr, struct map *map,
790 struct map_node *mn, struct xseg_request *req)
792 struct peerd *peer = pr->peer;
793 int r = xseg_prep_request(peer->xseg, req, map->volumelen, v1_objectsize_in_map);
795 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
797 mn->object, map->volume, (unsigned long long) mn->objectidx);
800 char *target = xseg_get_target(peer->xseg, req);
801 strncpy(target, map->volume, req->targetlen);
802 req->size = req->datalen;
803 req->offset = v1_mapheader_size + mn->objectidx * v1_objectsize_in_map;
805 char *data = xseg_get_data(pr->peer->xseg, req);
806 v1_object_to_map(data, mn);
810 static int read_map_v1(struct map *m, unsigned char * data)
813 struct map_node *map_node;
819 m->version = *(uint32_t *) (data + pos);
820 pos += sizeof(uint32_t);
821 m->size = *(uint64_t *) (data + pos);
822 pos += sizeof(uint64_t);
825 nr_objs = m->size / block_size;
826 if (m->size % block_size)
828 map_node = calloc(nr_objs, sizeof(struct map_node));
832 for (i = 0; i < nr_objs; i++) {
834 map_node[i].objectidx = i;
835 map_node[i].waiters = 0;
837 map_node[i].cond = st_cond_new(); //FIXME err check;
838 read_object_v1(&map_node[i], data+pos);
839 pos += objectsize_in_map;
840 r = insert_object(m, &map_node[i]); //FIXME error check
845 static int prepare_write_map_v1(struct peer_req *pr, struct map *m,
846 struct xseg_request *req)
848 struct peerd *peer = pr->peer;
849 uint64_t i, pos = 0, max_objidx = calc_map_obj(m);
852 int r = xseg_prep_request(peer->xseg, req, m->volumelen,
853 v1_mapheader_size + max_objidx * v1_objectsize_in_map);
855 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", m->volume);
858 char *target = xseg_get_target(peer->xseg, req);
859 strncpy(target, m->volume, req->targetlen);
860 char *data = xseg_get_data(peer->xseg, req);
862 memcpy(data + pos, &m->version, sizeof(m->version));
863 pos += sizeof(m->version);
864 memcpy(data + pos, &m->size, sizeof(m->size));
865 pos += sizeof(m->size);
868 req->size = req->datalen;
871 for (i = 0; i < max_objidx; i++) {
872 mn = find_object(m, i);
874 XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
875 (unsigned long long) i, m->volume);
878 v1_object_to_map(data+pos, mn);
879 pos += v1_objectsize_in_map;
884 /* static struct map_functions map_functions_v1 = { */
885 /* .read_object = read_object_v1, */
886 /* .read_map = read_map_v1, */
887 /* .prepare_write_object = prepare_write_object_v1, */
888 /* .prepare_write_map = prepare_write_map_v1 */
890 #define map_functions_v1 { \
891 .read_object = read_object_v1, \
892 .read_map = read_map_v1, \
893 .prepare_write_object = prepare_write_object_v1,\
894 .prepare_write_map = prepare_write_map_v1 \
897 static struct map_functions map_functions[] = { map_functions_v0,
899 #define MAP_LATEST_VERSION 1
900 /* end of functions */
906 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
908 hexlify(buf, mn->object);
909 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
910 mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
911 mn->flags = MF_OBJECT_EXIST;
914 static inline void map_to_object(struct map_node *mn, unsigned char *buf)
919 mn->flags |= MF_OBJECT_EXIST;
920 strcpy(mn->object, MAPPER_PREFIX);
921 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
922 mn->object[MAX_OBJECT_LEN] = 0;
923 mn->objectlen = strlen(mn->object);
926 hexlify(buf+1, mn->object);
927 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
928 mn->objectlen = strlen(mn->object);
933 static inline void object_to_map(char* buf, struct map_node *mn)
935 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
937 /* strip common prefix */
938 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
941 unhexlify(mn->object, (unsigned char *)(buf+1));
945 static inline void mapheader_to_map(struct map *m, char *buf)
948 memcpy(buf + pos, &m->version, sizeof(m->version));
949 pos += sizeof(m->version);
950 memcpy(buf + pos, &m->size, sizeof(m->size));
951 pos += sizeof(m->size);
955 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
956 struct map *map, struct map_node *mn)
960 struct mapperd *mapper = __get_mapperd(peer);
961 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
962 mapper->mbportno, X_ALLOC);
964 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
966 mn->object, map->volume, (unsigned long long) mn->objectidx);
970 r = map_functions[map->version].prepare_write_object(pr, map, mn, req);
972 XSEGLOG2(&lc, E, "Cannot prepare write object");
977 r = xseg_set_req_data(peer->xseg, req, pr);
979 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
981 mn->object, map->volume, (unsigned long long) mn->objectidx);
984 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
986 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
988 mn->object, map->volume, (unsigned long long) mn->objectidx);
991 r = xseg_signal(peer->xseg, p);
993 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
995 XSEGLOG2(&lc, I, "Writing object %s \n\t"
997 mn->object, map->volume, (unsigned long long) mn->objectidx);
1002 xseg_get_req_data(peer->xseg, req, &dummy);
1004 xseg_put_request(peer->xseg, req, pr->portno);
1006 XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
1008 mn->object, map->volume, (unsigned long long) mn->objectidx);
1012 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
1016 struct peerd *peer = pr->peer;
1017 struct mapperd *mapper = __get_mapperd(peer);
1018 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1019 mapper->mbportno, X_ALLOC);
1021 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
1025 r = map_functions[map->version].prepare_write_map(pr, map, req);
1027 XSEGLOG2(&lc, E, "Cannot prepare write map");
1033 r = xseg_set_req_data(peer->xseg, req, pr);
1035 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1039 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1041 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1045 r = xseg_signal(peer->xseg, p);
1047 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
1049 map->flags |= MF_MAP_WRITING;
1050 XSEGLOG2(&lc, I, "Writing map %s", map->volume);
1054 xseg_get_req_data(peer->xseg, req, &dummy);
1056 xseg_put_request(peer->xseg, req, pr->portno);
1058 XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
1062 static int write_map(struct peer_req* pr, struct map *map)
1065 struct peerd *peer = pr->peer;
1066 struct xseg_request *req = __write_map(pr, map);
1069 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1070 if (req->state & XS_FAILED)
1072 xseg_put_request(peer->xseg, req, pr->portno);
1073 map->flags &= ~MF_MAP_WRITING;
1077 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
1081 struct xseg_request *req;
1082 struct peerd *peer = pr->peer;
1083 struct mapperd *mapper = __get_mapperd(peer);
1086 XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
1088 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1090 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1095 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1097 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1102 char *reqtarget = xseg_get_target(peer->xseg, req);
1105 strncpy(reqtarget, m->volume, req->targetlen);
1107 req->size = block_size;
1109 r = xseg_set_req_data(peer->xseg, req, pr);
1111 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1115 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1117 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1121 r = xseg_signal(peer->xseg, p);
1123 m->flags |= MF_MAP_LOADING;
1124 XSEGLOG2(&lc, I, "Map %s loading", m->volume);
1128 xseg_get_req_data(peer->xseg, req, &dummy);
1130 xseg_put_request(peer->xseg, req, pr->portno);
1135 static int read_map (struct map *map, unsigned char *buf)
1137 char nulls[SHA256_DIGEST_SIZE];
1138 memset(nulls, 0, SHA256_DIGEST_SIZE);
1140 int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
1142 XSEGLOG2(&lc, E, "Read zeros");
1145 //type 1, archip type, type 0 pithos map
1146 int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1147 XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
1150 version = *(uint32_t *) (buf); //version should always be the first uint32_t
1153 if (version > MAP_LATEST_VERSION){
1154 XSEGLOG2(&lc, E, "Map read for map %s failed. Invalid version %u",
1155 map->volume, version);
1159 r = map_functions[version].read_map(map, buf);
1161 XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
1166 XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
1171 static int load_map(struct peer_req *pr, struct map *map)
1174 struct xseg_request *req;
1175 struct peerd *peer = pr->peer;
1176 req = __load_map(pr, map);
1179 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1180 map->flags &= ~MF_MAP_LOADING;
1181 if (req->state & XS_FAILED){
1182 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
1183 xseg_put_request(peer->xseg, req, pr->portno);
1186 r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
1187 xseg_put_request(peer->xseg, req, pr->portno);
1191 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
1196 struct xseg_request *req;
1197 struct peerd *peer = pr->peer;
1198 struct mapperd *mapper = __get_mapperd(peer);
1201 XSEGLOG2(&lc, I, "Opening map %s", m->volume);
1203 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1205 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1210 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1212 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1217 char *reqtarget = xseg_get_target(peer->xseg, req);
1220 strncpy(reqtarget, m->volume, req->targetlen);
1221 req->op = X_ACQUIRE;
1222 req->size = block_size;
1224 if (!(flags & MF_FORCE))
1225 req->flags = XF_NOSYNC;
1226 r = xseg_set_req_data(peer->xseg, req, pr);
1228 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1232 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1234 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1238 r = xseg_signal(peer->xseg, p);
1240 m->flags |= MF_MAP_OPENING;
1241 XSEGLOG2(&lc, I, "Map %s opening", m->volume);
1245 xseg_get_req_data(peer->xseg, req, &dummy);
1247 xseg_put_request(peer->xseg, req, pr->portno);
1252 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
1255 struct xseg_request *req;
1256 struct peerd *peer = pr->peer;
1258 req = __open_map(pr, map, flags);
1262 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1263 map->flags &= ~MF_MAP_OPENING;
1264 err = req->state & XS_FAILED;
1265 xseg_put_request(peer->xseg, req, pr->portno);
1269 map->flags |= MF_MAP_EXCLUSIVE;
1277 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
1281 XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
1283 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1284 if (r == -XHASH_ERESIZE) {
1285 xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
1286 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1289 mio->copyups_nodes = new_hashmap;
1290 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1293 XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
1297 XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
1299 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1300 if (r == -XHASH_ERESIZE) {
1301 xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
1302 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1305 mio->copyups_nodes = new_hashmap;
1306 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1309 XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
1316 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
1318 struct map_node *mn;
1319 int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
1321 XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
1324 XSEGLOG2(&lc, D, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
1328 static struct xseg_request * __snapshot_object(struct peer_req *pr,
1329 struct map_node *mn)
1331 struct peerd *peer = pr->peer;
1332 struct mapperd *mapper = __get_mapperd(peer);
1333 struct mapper_io *mio = __get_mapper_io(pr);
1334 //struct map *map = mn->map;
1339 //assert mn->volume != zero_block
1340 //assert mn->flags & MF_OBJECT_EXIST
1341 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1342 mapper->bportno, X_ALLOC);
1344 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1347 r = xseg_prep_request(peer->xseg, req, mn->objectlen,
1348 sizeof(struct xseg_request_snapshot));
1350 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1354 char *target = xseg_get_target(peer->xseg, req);
1355 strncpy(target, mn->object, req->targetlen);
1357 struct xseg_request_snapshot *xsnapshot = (struct xseg_request_snapshot *) xseg_get_data(peer->xseg, req);
1358 xsnapshot->target[0] = 0;
1359 xsnapshot->targetlen = 0;
1362 req->size = block_size;
1363 req->op = X_SNAPSHOT;
1364 r = xseg_set_req_data(peer->xseg, req, pr);
1366 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1369 r = __set_copyup_node(mio, req, mn);
1372 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1374 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1375 goto out_mapper_unset;
1377 xseg_signal(peer->xseg, p);
1379 mn->flags |= MF_OBJECT_SNAPSHOTTING;
1380 XSEGLOG2(&lc, I, "Snapshotting up object %s", mn->object);
1384 __set_copyup_node(mio, req, NULL);
1386 xseg_get_req_data(peer->xseg, req, &dummy);
1388 xseg_put_request(peer->xseg, req, pr->portno);
1390 XSEGLOG2(&lc, E, "Snapshotting object %s failed", mn->object);
1394 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
1396 struct mapperd *mapper = __get_mapperd(peer);
1397 struct mapper_io *mio = __get_mapper_io(pr);
1398 struct map *map = mn->map;
1403 uint32_t newtargetlen;
1404 char new_target[MAX_OBJECT_LEN + 1];
1405 unsigned char sha[SHA256_DIGEST_SIZE];
1407 strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1409 char tmp[XSEG_MAX_TARGETLEN + 1];
1411 strncpy(tmp, map->volume, map->volumelen);
1412 sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1413 tmp[XSEG_MAX_TARGETLEN] = 0;
1414 tmplen = strlen(tmp);
1415 SHA256((unsigned char *)tmp, tmplen, sha);
1416 hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1417 newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1420 if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1421 goto copyup_zeroblock;
1423 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1424 mapper->bportno, X_ALLOC);
1426 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1429 r = xseg_prep_request(peer->xseg, req, newtargetlen,
1430 sizeof(struct xseg_request_copy));
1432 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1436 char *target = xseg_get_target(peer->xseg, req);
1437 strncpy(target, new_target, req->targetlen);
1439 struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1440 strncpy(xcopy->target, mn->object, mn->objectlen);
1441 xcopy->targetlen = mn->objectlen;
1444 req->size = block_size;
1446 r = xseg_set_req_data(peer->xseg, req, pr);
1448 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1451 r = __set_copyup_node(mio, req, mn);
1454 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1456 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1457 goto out_mapper_unset;
1459 xseg_signal(peer->xseg, p);
1462 mn->flags |= MF_OBJECT_COPYING;
1463 XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1467 __set_copyup_node(mio, req, NULL);
1469 xseg_get_req_data(peer->xseg, req, &dummy);
1471 xseg_put_request(peer->xseg, req, pr->portno);
1473 XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1477 XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1478 "Proceeding in writing the new object in map");
1479 /* construct a tmp map_node for writing purposes */
1480 struct map_node newmn = *mn;
1481 newmn.flags = MF_OBJECT_EXIST;
1482 strncpy(newmn.object, new_target, newtargetlen);
1483 newmn.object[newtargetlen] = 0;
1484 newmn.objectlen = newtargetlen;
1485 newmn.objectidx = mn->objectidx;
1486 req = object_write(peer, pr, map, &newmn);
1487 r = __set_copyup_node(mio, req, mn);
1491 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1492 "\n\t of map %s [%llu]",
1493 mn->object, map->volume, (unsigned long long) mn->objectidx);
1496 mn->flags |= MF_OBJECT_WRITING;
1497 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1501 static struct xseg_request * __delete_object(struct peer_req *pr, struct map_node *mn)
1504 struct peerd *peer = pr->peer;
1505 struct mapperd *mapper = __get_mapperd(peer);
1506 struct mapper_io *mio = __get_mapper_io(pr);
1507 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1508 mapper->bportno, X_ALLOC);
1509 XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1511 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1514 int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1516 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1519 char *target = xseg_get_target(peer->xseg, req);
1520 strncpy(target, mn->object, req->targetlen);
1522 req->size = req->datalen;
1524 r = xseg_set_req_data(peer->xseg, req, pr);
1526 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1529 r = __set_copyup_node(mio, req, mn);
1532 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1534 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1535 goto out_mapper_unset;
1537 r = xseg_signal(peer->xseg, p);
1538 mn->flags |= MF_OBJECT_DELETING;
1539 XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1543 __set_copyup_node(mio, req, NULL);
1545 xseg_get_req_data(peer->xseg, req, &dummy);
1547 xseg_put_request(peer->xseg, req, pr->portno);
1549 XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1553 static struct xseg_request * __delete_map(struct peer_req *pr, struct map *map)
1556 struct peerd *peer = pr->peer;
1557 struct mapperd *mapper = __get_mapperd(peer);
1558 struct mapper_io *mio = __get_mapper_io(pr);
1559 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1560 mapper->mbportno, X_ALLOC);
1561 XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1563 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1566 int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1568 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1571 char *target = xseg_get_target(peer->xseg, req);
1572 strncpy(target, map->volume, req->targetlen);
1574 req->size = req->datalen;
1576 r = xseg_set_req_data(peer->xseg, req, pr);
1578 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1581 /* do not check return value. just make sure there is no node set */
1582 __set_copyup_node(mio, req, NULL);
1583 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1585 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1588 r = xseg_signal(peer->xseg, p);
1589 map->flags |= MF_MAP_DELETING;
1590 XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1594 xseg_get_req_data(peer->xseg, req, &dummy);
1596 xseg_put_request(peer->xseg, req, pr->portno);
1598 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1603 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1605 struct map_node *mn = find_object(map, index);
1611 static inline void put_mapnode(struct map_node *mn)
1616 st_cond_destroy(mn->cond);
1620 static inline void __get_map(struct map *map)
1625 static inline void put_map(struct map *map)
1627 struct map_node *mn;
1630 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1633 for (i = 0; i < calc_map_obj(map); i++) {
1634 mn = get_mapnode(map, i);
1636 //make sure all pending operations on all objects are completed
1637 //this should never happen...
1638 if (mn->flags & MF_OBJECT_NOT_READY)
1639 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1640 mn->flags |= MF_OBJECT_DESTROYED;
1641 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1642 put_mapnode(mn); //matcing get_mapnode;
1643 //assert mn->ref == 0;
1646 mn = find_object(map, 0);
1649 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1654 static struct map * create_map(struct mapperd *mapper, char *name,
1655 uint32_t namelen, uint32_t flags)
1658 if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1659 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1660 namelen, MAX_VOLUME_LEN);
1663 struct map *m = malloc(sizeof(struct map));
1665 XSEGLOG2(&lc, E, "Cannot allocate map ");
1669 if (flags & MF_ARCHIP){
1670 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1671 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1672 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1673 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1674 m->version = 1; /* keep this hardcoded for now */
1677 strncpy(m->volume, name, namelen);
1678 m->volume[namelen] = 0;
1679 m->volumelen = namelen;
1680 m->version = 0; /* version 0 should be pithos maps */
1683 m->objects = xhash_new(3, INTEGER);
1685 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1691 m->cond = st_cond_new(); //FIXME err check;
1692 r = insert_map(mapper, m);
1694 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1701 xhash_free(m->objects);
1703 XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1711 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1713 struct peerd *peer = pr->peer;
1714 struct mapperd *mapper = __get_mapperd(peer);
1716 struct mapper_io *mio = __get_mapper_io(pr);
1717 struct map_node *mn = __get_copyup_node(mio, req);
1719 __set_copyup_node(mio, req, NULL);
1721 //assert req->op = X_DELETE;
1722 //assert pr->req->op = X_DELETE only map deletions make delete requests
1723 //assert mio->del_pending > 0
1724 XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
1727 if (req->state & XS_FAILED){
1731 XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
1732 mn, mn->object, mio, req);
1733 // assert mn->flags & MF_OBJECT_DELETING
1734 mn->flags &= ~MF_OBJECT_DELETING;
1735 mn->flags |= MF_OBJECT_DESTROYED;
1737 /* put mapnode here, matches get_mapnode on do_destroy */
1740 XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
1743 xseg_put_request(peer->xseg, req, pr->portno);
1747 void snapshot_cb(struct peer_req *pr, struct xseg_request *req)
1749 struct peerd *peer = pr->peer;
1750 struct mapperd *mapper = __get_mapperd(peer);
1752 struct mapper_io *mio = __get_mapper_io(pr);
1753 struct map_node *mn = __get_copyup_node(mio, req);
1755 XSEGLOG2(&lc, E, "Cannot get map node");
1758 __set_copyup_node(mio, req, NULL);
1760 if (req->state & XS_FAILED){
1761 if (req->op == X_DELETE){
1762 XSEGLOG2(&lc, E, "Delete req failed");
1765 XSEGLOG2(&lc, E, "Req failed");
1766 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1767 mn->flags &= ~MF_OBJECT_WRITING;
1771 if (req->op == X_WRITE) {
1772 char old_object_name[MAX_OBJECT_LEN + 1];
1773 uint32_t old_objectlen;
1775 char *target = xseg_get_target(peer->xseg, req);
1777 //assert mn->flags & MF_OBJECT_WRITING
1778 mn->flags &= ~MF_OBJECT_WRITING;
1779 strncpy(old_object_name, mn->object, mn->objectlen);
1780 old_objectlen = mn->objectlen;
1782 struct map_node tmp;
1783 char *data = xseg_get_data(peer->xseg, req);
1784 map_to_object(&tmp, (unsigned char *) data);
1785 mn->flags &= ~MF_OBJECT_EXIST;
1787 strncpy(mn->object, tmp.object, tmp.objectlen);
1788 mn->object[tmp.objectlen] = 0;
1789 mn->objectlen = tmp.objectlen;
1790 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1791 //signal_mapnode since Snapshot was successfull
1794 //do delete old object
1795 strncpy(tmp.object, old_object_name, old_objectlen);
1796 tmp.object[old_objectlen] = 0;
1797 tmp.objectlen = old_objectlen;
1798 tmp.flags = MF_OBJECT_EXIST;
1799 struct xseg_request *xreq = __delete_object(pr, &tmp);
1801 //just a warning. Snapshot was successfull
1802 XSEGLOG2(&lc, W, "Cannot delete old object %s", tmp.object);
1805 //overwrite copyup node, since tmp is a stack dummy variable
1806 __set_copyup_node (mio, xreq, mn);
1807 XSEGLOG2(&lc, I, "Deletion of %s pending", tmp.object);
1808 } else if (req->op == X_SNAPSHOT) {
1809 //issue write_object;
1810 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1811 struct map *map = mn->map;
1813 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1817 /* construct a tmp map_node for writing purposes */
1818 //char *target = xseg_get_target(peer->xseg, req);
1819 struct map_node newmn = *mn;
1821 struct xseg_reply_snapshot *xreply;
1822 xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, req);
1823 //assert xreply->targetlen !=0
1824 //assert xreply->targetlen < XSEG_MAX_TARGETLEN
1825 //xreply->target[xreply->targetlen] = 0;
1826 //assert xreply->target valid
1827 strncpy(newmn.object, xreply->target, xreply->targetlen);
1828 newmn.object[req->targetlen] = 0;
1829 newmn.objectlen = req->targetlen;
1830 newmn.objectidx = mn->objectidx;
1831 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1833 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1834 "\n\t of map %s [%llu]",
1835 mn->object, map->volume, (unsigned long long) mn->objectidx);
1838 mn->flags |= MF_OBJECT_WRITING;
1839 __set_copyup_node (mio, xreq, mn);
1841 XSEGLOG2(&lc, I, "Object %s snapshot completed. Pending writing.", mn->object);
1842 } else if (req->op == X_DELETE){
1843 //deletion of the old block completed
1844 XSEGLOG2(&lc, I, "Deletion of completed");
1853 xseg_put_request(peer->xseg, req, pr->portno);
1857 mio->snap_pending--;
1858 XSEGLOG2(&lc, D, "Mio->snap_pending: %u", mio->snap_pending);
1866 mio->snap_pending--;
1872 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1874 struct peerd *peer = pr->peer;
1875 struct mapperd *mapper = __get_mapperd(peer);
1877 struct mapper_io *mio = __get_mapper_io(pr);
1878 struct map_node *mn = __get_copyup_node(mio, req);
1880 XSEGLOG2(&lc, E, "Cannot get map node");
1883 __set_copyup_node(mio, req, NULL);
1885 if (req->state & XS_FAILED){
1886 XSEGLOG2(&lc, E, "Req failed");
1887 mn->flags &= ~MF_OBJECT_COPYING;
1888 mn->flags &= ~MF_OBJECT_WRITING;
1891 if (req->op == X_WRITE) {
1892 char *target = xseg_get_target(peer->xseg, req);
1894 //printf("handle object write replyi\n");
1895 __set_copyup_node(mio, req, NULL);
1896 //assert mn->flags & MF_OBJECT_WRITING
1897 mn->flags &= ~MF_OBJECT_WRITING;
1899 struct map_node tmp;
1900 char *data = xseg_get_data(peer->xseg, req);
1901 map_to_object(&tmp, (unsigned char *) data);
1902 mn->flags |= MF_OBJECT_EXIST;
1903 if (mn->flags != MF_OBJECT_EXIST){
1904 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1907 //assert mn->flags & MF_OBJECT_EXIST
1908 strncpy(mn->object, tmp.object, tmp.objectlen);
1909 mn->object[tmp.objectlen] = 0;
1910 mn->objectlen = tmp.objectlen;
1911 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1915 } else if (req->op == X_COPY) {
1916 // issue write_object;
1917 mn->flags &= ~MF_OBJECT_COPYING;
1918 struct map *map = mn->map;
1920 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1924 /* construct a tmp map_node for writing purposes */
1925 char *target = xseg_get_target(peer->xseg, req);
1926 struct map_node newmn = *mn;
1927 newmn.flags = MF_OBJECT_EXIST;
1928 strncpy(newmn.object, target, req->targetlen);
1929 newmn.object[req->targetlen] = 0;
1930 newmn.objectlen = req->targetlen;
1931 newmn.objectidx = mn->objectidx;
1932 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1934 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1935 "\n\t of map %s [%llu]",
1936 mn->object, map->volume, (unsigned long long) mn->objectidx);
1939 mn->flags |= MF_OBJECT_WRITING;
1940 __set_copyup_node (mio, xreq, mn);
1942 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1949 xseg_put_request(peer->xseg, req, pr->portno);
1954 XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1964 struct map_node *mn;
1969 static int req2objs(struct peer_req *pr, struct map *map, int write)
1972 struct peerd *peer = pr->peer;
1973 struct mapper_io *mio = __get_mapper_io(pr);
1974 char *target = xseg_get_target(peer->xseg, pr->req);
1975 uint32_t nr_objs = calc_nr_obj(pr->req);
1976 uint64_t size = sizeof(struct xseg_reply_map) +
1977 nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1979 uint64_t rem_size, obj_index, obj_offset, obj_size;
1980 struct map_node *mn;
1982 XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1984 /* get map_nodes of request */
1985 struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1987 XSEGLOG2(&lc, E, "Cannot allocate mns");
1991 rem_size = pr->req->size;
1992 obj_index = pr->req->offset / block_size;
1993 obj_offset = pr->req->offset & (block_size -1); //modulo
1994 obj_size = (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1995 mn = get_mapnode(map, obj_index);
1997 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
2002 mns[idx].offset = obj_offset;
2003 mns[idx].size = obj_size;
2004 rem_size -= obj_size;
2005 while (rem_size > 0) {
2009 obj_size = (rem_size > block_size) ? block_size : rem_size;
2010 rem_size -= obj_size;
2011 mn = get_mapnode(map, obj_index);
2013 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
2018 mns[idx].offset = obj_offset;
2019 mns[idx].size = obj_size;
2024 /* do a first scan and issue as many copyups as we can.
2025 * then retry and wait when an object is not ready.
2026 * this could be done better, since now we wait also on the
2030 for (j = 0; j < 2 && !mio->err; j++) {
2031 for (i = 0; i < (idx+1); i++) {
2034 if (mn->flags & MF_OBJECT_NOT_READY){
2037 if (mn->flags & MF_OBJECT_NOT_READY)
2038 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2039 if (mn->flags & MF_OBJECT_DESTROYED){
2045 if (!(mn->flags & MF_OBJECT_EXIST)) {
2046 //calc new_target, copy up object
2047 if (copyup_object(peer, mn, pr) == NULL){
2048 XSEGLOG2(&lc, E, "Error in copy up object");
2056 XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
2062 if (mio->copyups > 0)
2063 wait_on_pr(pr, mio->copyups > 0);
2068 XSEGLOG2(&lc, E, "Mio->err");
2072 /* resize request to fit reply */
2073 char buf[XSEG_MAX_TARGETLEN];
2074 strncpy(buf, target, pr->req->targetlen);
2075 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
2077 XSEGLOG2(&lc, E, "Cannot resize request");
2080 target = xseg_get_target(peer->xseg, pr->req);
2081 strncpy(target, buf, pr->req->targetlen);
2083 /* structure reply */
2084 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2085 reply->cnt = nr_objs;
2086 for (i = 0; i < (idx+1); i++) {
2087 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
2088 reply->segs[i].targetlen = mns[i].mn->objectlen;
2089 reply->segs[i].offset = mns[i].offset;
2090 reply->segs[i].size = mns[i].size;
2093 for (i = 0; i < idx; i++) {
2094 put_mapnode(mns[i].mn);
2101 static int do_dropcache(struct peer_req *pr, struct map *map)
2103 struct map_node *mn;
2104 struct peerd *peer = pr->peer;
2105 struct mapperd *mapper = __get_mapperd(peer);
2107 XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
2108 map->flags |= MF_MAP_DROPPING_CACHE;
2109 for (i = 0; i < calc_map_obj(map); i++) {
2110 mn = get_mapnode(map, i);
2112 if (!(mn->flags & MF_OBJECT_DESTROYED)){
2113 //make sure all pending operations on all objects are completed
2114 if (mn->flags & MF_OBJECT_NOT_READY)
2115 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2116 mn->flags |= MF_OBJECT_DESTROYED;
2121 map->flags &= ~MF_MAP_DROPPING_CACHE;
2122 map->flags |= MF_MAP_DESTROYED;
2123 remove_map(mapper, map);
2124 XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
2125 put_map(map); // put map here to destroy it (matches m->ref = 1 on map create)
2129 static int do_info(struct peer_req *pr, struct map *map)
2131 struct peerd *peer = pr->peer;
2132 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
2133 xinfo->size = map->size;
2138 static int do_open(struct peer_req *pr, struct map *map)
2140 if (map->flags & MF_MAP_EXCLUSIVE){
2148 static int do_close(struct peer_req *pr, struct map *map)
2150 if (map->flags & MF_MAP_EXCLUSIVE){
2151 /* do not drop cache if close failed and map not deleted */
2152 if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
2155 return do_dropcache(pr, map);
2158 static int do_snapshot(struct peer_req *pr, struct map *map)
2161 struct peerd *peer = pr->peer;
2162 struct mapper_io *mio = __get_mapper_io(pr);
2163 struct map_node *mn;
2164 struct xseg_request *req;
2166 if (!(map->flags & MF_MAP_EXCLUSIVE)){
2167 XSEGLOG2(&lc, E, "Map was not opened exclusively");
2170 XSEGLOG2(&lc, I, "Starting snapshot for map %s", map->volume);
2171 map->flags |= MF_MAP_SNAPSHOTTING;
2173 uint64_t nr_obj = calc_map_obj(map);
2174 mio->cb = snapshot_cb;
2175 mio->snap_pending = 0;
2177 for (i = 0; i < nr_obj; i++){
2179 /* throttle pending snapshots
2180 * this should be nr_ops of the blocker, but since we don't know
2181 * that, we assume based on our own nr_ops
2183 if (mio->snap_pending >= peer->nr_ops)
2184 wait_on_pr(pr, mio->snap_pending >= peer->nr_ops);
2186 mn = get_mapnode(map, i);
2190 if (!(mn->flags & MF_OBJECT_EXIST)){
2194 // make sure all pending operations on all objects are completed
2195 if (mn->flags & MF_OBJECT_NOT_READY)
2196 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2198 /* TODO will this ever happen?? */
2199 if (mn->flags & MF_OBJECT_DESTROYED){
2204 req = __snapshot_object(pr, mn);
2210 mio->snap_pending++;
2211 /* do not put_mapnode here. cb does that */
2214 if (mio->snap_pending > 0)
2215 wait_on_pr(pr, mio->snap_pending > 0);
2221 /* calculate name of snapshot */
2222 struct map tmp_map = *map;
2223 unsigned char sha[SHA256_DIGEST_SIZE];
2224 unsigned char *buf = malloc(block_size);
2225 char newvolumename[MAX_VOLUME_LEN];
2226 uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
2228 uint64_t max_objidx = calc_map_obj(map);
2231 for (i = 0; i < max_objidx; i++) {
2232 mn = find_object(map, i);
2234 XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
2235 (unsigned long long) i, map->volume);
2238 v0_object_to_map(mn, buf+pos);
2239 pos += v0_objectsize_in_map;
2241 // SHA256(buf, pos, sha);
2242 merkle_hash(buf, pos, sha);
2243 hexlify(sha, newvolumename);
2244 strncpy(tmp_map.volume, newvolumename, newvolumenamelen);
2245 tmp_map.volumelen = newvolumenamelen;
2247 tmp_map.version = 0; // set volume version to pithos image
2249 /* write the map of the Snapshot */
2250 r = write_map(pr, &tmp_map);
2253 char targetbuf[XSEG_MAX_TARGETLEN];
2254 char *target = xseg_get_target(peer->xseg, pr->req);
2255 strncpy(targetbuf, target, pr->req->targetlen);
2256 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
2257 sizeof(struct xseg_reply_snapshot));
2259 XSEGLOG2(&lc, E, "Cannot resize request");
2262 target = xseg_get_target(peer->xseg, pr->req);
2263 strncpy(target, targetbuf, pr->req->targetlen);
2265 struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *)
2266 xseg_get_data(peer->xseg, pr->req);
2267 strncpy(xreply->target, newvolumename, newvolumenamelen);
2268 xreply->targetlen = newvolumenamelen;
2269 map->flags &= ~MF_MAP_SNAPSHOTTING;
2270 XSEGLOG2(&lc, I, "Snapshot for map %s completed", map->volume);
2274 map->flags &= ~MF_MAP_SNAPSHOTTING;
2275 XSEGLOG2(&lc, E, "Snapshot for map %s failed", map->volume);
2280 static int do_destroy(struct peer_req *pr, struct map *map)
2283 struct peerd *peer = pr->peer;
2284 struct mapper_io *mio = __get_mapper_io(pr);
2285 struct map_node *mn;
2286 struct xseg_request *req;
2288 if (!(map->flags & MF_MAP_EXCLUSIVE))
2291 XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
2292 req = __delete_map(pr, map);
2295 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
2296 if (req->state & XS_FAILED){
2297 xseg_put_request(peer->xseg, req, pr->portno);
2298 map->flags &= ~MF_MAP_DELETING;
2301 xseg_put_request(peer->xseg, req, pr->portno);
2303 uint64_t nr_obj = calc_map_obj(map);
2304 mio->cb = deletion_cb;
2305 mio->del_pending = 0;
2307 for (i = 0; i < nr_obj; i++){
2309 /* throttle pending deletions
2310 * this should be nr_ops of the blocker, but since we don't know
2311 * that, we assume based on our own nr_ops
2313 if (mio->del_pending >= peer->nr_ops)
2314 wait_on_pr(pr, mio->del_pending >= peer->nr_ops);
2316 mn = get_mapnode(map, i);
2319 if (mn->flags & MF_OBJECT_DESTROYED){
2323 if (!(mn->flags & MF_OBJECT_EXIST)){
2324 mn->flags |= MF_OBJECT_DESTROYED;
2329 // make sure all pending operations on all objects are completed
2330 if (mn->flags & MF_OBJECT_NOT_READY)
2331 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2333 req = __delete_object(pr, mn);
2340 /* do not put_mapnode here. cb does that */
2343 if (mio->del_pending > 0)
2344 wait_on_pr(pr, mio->del_pending > 0);
2347 map->flags &= ~MF_MAP_DELETING;
2348 map->flags |= MF_MAP_DELETED;
2349 XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
2350 return do_close(pr, map);
2353 static int do_mapr(struct peer_req *pr, struct map *map)
2355 struct peerd *peer = pr->peer;
2356 int r = req2objs(pr, map, 0);
2358 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
2360 (unsigned long long) pr->req->offset,
2361 (unsigned long long) (pr->req->offset + pr->req->size));
2364 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
2366 (unsigned long long) pr->req->offset,
2367 (unsigned long long) (pr->req->offset + pr->req->size));
2368 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2369 (unsigned long long) pr->req->offset,
2370 (unsigned long long) pr->req->size);
2371 char buf[XSEG_MAX_TARGETLEN+1];
2372 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2374 for (i = 0; i < reply->cnt; i++) {
2375 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2376 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2377 buf[reply->segs[i].targetlen] = 0;
2378 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2379 (unsigned long long) reply->segs[i].offset,
2380 (unsigned long long) reply->segs[i].size);
2385 static int do_mapw(struct peer_req *pr, struct map *map)
2387 struct peerd *peer = pr->peer;
2388 int r = req2objs(pr, map, 1);
2390 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
2392 (unsigned long long) pr->req->offset,
2393 (unsigned long long) (pr->req->offset + pr->req->size));
2396 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
2398 (unsigned long long) pr->req->offset,
2399 (unsigned long long) (pr->req->offset + pr->req->size));
2400 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2401 (unsigned long long) pr->req->offset,
2402 (unsigned long long) pr->req->size);
2403 char buf[XSEG_MAX_TARGETLEN+1];
2404 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2406 for (i = 0; i < reply->cnt; i++) {
2407 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2408 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2409 buf[reply->segs[i].targetlen] = 0;
2410 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2411 (unsigned long long) reply->segs[i].offset,
2412 (unsigned long long) reply->segs[i].size);
2417 //here map is the parent map
2418 static int do_clone(struct peer_req *pr, struct map *map)
2421 struct peerd *peer = pr->peer;
2422 struct mapperd *mapper = __get_mapperd(peer);
2423 char *target = xseg_get_target(peer->xseg, pr->req);
2424 struct map *clonemap;
2425 struct xseg_request_clone *xclone =
2426 (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2428 XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
2430 clonemap = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2434 /* open map to get exclusive access to map */
2435 r = open_map(pr, clonemap, 0);
2437 XSEGLOG2(&lc, E, "Cannot open map %s", clonemap->volume);
2438 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2441 r = load_map(pr, clonemap);
2443 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2447 if (xclone->size == -1)
2448 clonemap->size = map->size;
2450 clonemap->size = xclone->size;
2451 if (clonemap->size < map->size){
2452 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
2453 "\n\t for requested clone %s",
2454 (unsigned long long) xclone->size,
2455 (unsigned long long) map->size, clonemap->volume);
2458 if (clonemap->size > MAX_VOLUME_SIZE) {
2459 XSEGLOG2(&lc, E, "Requested size %llu > max volume size %llu"
2460 "\n\t for volume %s",
2461 clonemap->size, MAX_VOLUME_SIZE, clonemap->volume);
2465 //alloc and init map_nodes
2466 //unsigned long c = clonemap->size/block_size + 1;
2467 unsigned long c = calc_map_obj(clonemap);
2468 struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
2473 //for (i = 0; i < clonemap->size/block_size + 1; i++) {
2474 for (i = 0; i < c; i++) {
2475 struct map_node *mn = get_mapnode(map, i);
2477 strncpy(map_nodes[i].object, mn->object, mn->objectlen);
2478 map_nodes[i].objectlen = mn->objectlen;
2481 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2482 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2484 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2485 map_nodes[i].flags = 0;
2486 map_nodes[i].objectidx = i;
2487 map_nodes[i].map = clonemap;
2488 map_nodes[i].ref = 1;
2489 map_nodes[i].waiters = 0;
2490 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2491 r = insert_object(clonemap, &map_nodes[i]);
2493 XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
2498 r = write_map(pr, clonemap);
2500 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
2503 do_close(pr, clonemap);
2507 do_close(pr, clonemap);
2511 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
2514 if (flags & MF_EXCLUSIVE){
2515 r = open_map(pr, map, flags);
2517 if (flags & MF_FORCE){
2524 r = load_map(pr, map);
2525 if (r < 0 && opened){
2531 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
2535 struct peerd *peer = pr->peer;
2536 struct mapperd *mapper = __get_mapperd(peer);
2537 struct map *map = find_map_len(mapper, name, namelen, flags);
2539 if (flags & MF_LOAD){
2540 map = create_map(mapper, name, namelen, flags);
2543 r = open_load_map(pr, map, flags);
2545 do_dropcache(pr, map);
2551 } else if (map->flags & MF_MAP_DESTROYED){
2559 static int map_action(int (action)(struct peer_req *pr, struct map *map),
2560 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
2562 //struct peerd *peer = pr->peer;
2565 map = get_map(pr, name, namelen, flags);
2568 if (map->flags & MF_MAP_NOT_READY){
2569 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
2573 int r = action(pr, map);
2574 //always drop cache if map not read exclusively
2575 if (!(map->flags & MF_MAP_EXCLUSIVE))
2576 do_dropcache(pr, map);
2582 void * handle_info(struct peer_req *pr)
2584 struct peerd *peer = pr->peer;
2585 char *target = xseg_get_target(peer->xseg, pr->req);
2586 int r = map_action(do_info, pr, target, pr->req->targetlen,
2596 void * handle_clone(struct peer_req *pr)
2599 struct peerd *peer = pr->peer;
2600 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2606 if (xclone->targetlen){
2607 /* if snap was defined */
2608 //support clone only from pithos
2609 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
2612 /* else try to create a new volume */
2613 XSEGLOG2(&lc, I, "Creating volume");
2615 XSEGLOG2(&lc, E, "Cannot create volume. Size not specified");
2619 if (xclone->size > MAX_VOLUME_SIZE) {
2620 XSEGLOG2(&lc, E, "Requested size %llu > max volume "
2621 "size %llu", xclone->size, MAX_VOLUME_SIZE);
2627 char *target = xseg_get_target(peer->xseg, pr->req);
2629 //create a new empty map of size
2630 map = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2635 /* open map to get exclusive access to map */
2636 r = open_map(pr, map, 0);
2638 XSEGLOG2(&lc, E, "Cannot open map %s", map->volume);
2639 XSEGLOG2(&lc, E, "Target volume %s exists", map->volume);
2640 do_dropcache(pr, map);
2644 r = load_map(pr, map);
2646 XSEGLOG2(&lc, E, "Map exists %s", map->volume);
2651 map->size = xclone->size;
2652 //populate_map with zero objects;
2653 uint64_t nr_objs = xclone->size / block_size;
2654 if (xclone->size % block_size)
2657 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
2665 for (i = 0; i < nr_objs; i++) {
2666 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2667 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2668 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2669 map_nodes[i].flags = 0;
2670 map_nodes[i].objectidx = i;
2671 map_nodes[i].map = map;
2672 map_nodes[i].ref = 1;
2673 map_nodes[i].waiters = 0;
2674 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2675 r = insert_object(map, &map_nodes[i]);
2682 r = write_map(pr, map);
2684 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
2688 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
2690 do_close(pr, map); //drop cache here for consistency
2701 void * handle_mapr(struct peer_req *pr)
2703 struct peerd *peer = pr->peer;
2704 char *target = xseg_get_target(peer->xseg, pr->req);
2705 int r = map_action(do_mapr, pr, target, pr->req->targetlen,
2706 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2715 void * handle_mapw(struct peer_req *pr)
2717 struct peerd *peer = pr->peer;
2718 char *target = xseg_get_target(peer->xseg, pr->req);
2719 int r = map_action(do_mapw, pr, target, pr->req->targetlen,
2720 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2725 XSEGLOG2(&lc, D, "Ta: %d", ta);
2730 void * handle_destroy(struct peer_req *pr)
2732 struct peerd *peer = pr->peer;
2733 char *target = xseg_get_target(peer->xseg, pr->req);
2734 /* request EXCLUSIVE access, but do not force it.
2735 * check if succeeded on do_destroy
2737 int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2738 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2747 void * handle_open(struct peer_req *pr)
2749 struct peerd *peer = pr->peer;
2750 char *target = xseg_get_target(peer->xseg, pr->req);
2751 //here we do not want to load
2752 int r = map_action(do_open, pr, target, pr->req->targetlen,
2753 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2762 void * handle_close(struct peer_req *pr)
2764 struct peerd *peer = pr->peer;
2765 char *target = xseg_get_target(peer->xseg, pr->req);
2766 //here we do not want to load
2767 int r = map_action(do_close, pr, target, pr->req->targetlen,
2768 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2777 void * handle_snapshot(struct peer_req *pr)
2779 struct peerd *peer = pr->peer;
2780 char *target = xseg_get_target(peer->xseg, pr->req);
2781 /* request EXCLUSIVE access, but do not force it.
2782 * check if succeeded on do_snapshot
2784 int r = map_action(do_snapshot, pr, target, pr->req->targetlen,
2785 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2794 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
2795 struct xseg_request *req)
2797 //struct mapperd *mapper = __get_mapperd(peer);
2798 struct mapper_io *mio = __get_mapper_io(pr);
2799 void *(*action)(struct peer_req *) = NULL;
2801 mio->state = ACCEPTED;
2804 switch (pr->req->op) {
2805 /* primary xseg operations of mapper */
2806 case X_CLONE: action = handle_clone; break;
2807 case X_MAPR: action = handle_mapr; break;
2808 case X_MAPW: action = handle_mapw; break;
2809 case X_SNAPSHOT: action = handle_snapshot; break;
2810 case X_INFO: action = handle_info; break;
2811 case X_DELETE: action = handle_destroy; break;
2812 case X_OPEN: action = handle_open; break;
2813 case X_CLOSE: action = handle_close; break;
2814 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2819 st_thread_create(action, pr, 0, 0);
2825 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2826 enum dispatch_reason reason)
2828 struct mapperd *mapper = __get_mapperd(peer);
2830 struct mapper_io *mio = __get_mapper_io(pr);
2834 if (reason == dispatch_accept)
2835 dispatch_accepted(peer, pr, req);
2846 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2850 //FIXME error checks
2851 struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2852 peer->priv = mapperd;
2854 mapper->hashmaps = xhash_new(3, STRING);
2856 for (i = 0; i < peer->nr_ops; i++) {
2857 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2858 mio->copyups_nodes = xhash_new(3, INTEGER);
2862 peer->peer_reqs[i].priv = mio;
2865 mapper->bportno = -1;
2866 mapper->mbportno = -1;
2867 BEGIN_READ_ARGS(argc, argv);
2868 READ_ARG_ULONG("-bp", mapper->bportno);
2869 READ_ARG_ULONG("-mbp", mapper->mbportno);
2871 if (mapper->bportno == -1){
2872 XSEGLOG2(&lc, E, "Portno for blocker must be provided");
2876 if (mapper->mbportno == -1){
2877 XSEGLOG2(&lc, E, "Portno for mblocker must be provided");
2882 const struct sched_param param = { .sched_priority = 99 };
2883 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
2884 /* FIXME maybe place it in peer
2885 * should be done for each port (sportno to eportno)
2887 xseg_set_max_requests(peer->xseg, peer->portno_start, 5000);
2888 xseg_set_freequeue_size(peer->xseg, peer->portno_start, 3000, 0);
2896 /* FIXME this should not be here */
2897 int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
2899 struct xseg *xseg = peer->xseg;
2900 xport portno_start = peer->portno_start;
2901 xport portno_end = peer->portno_end;
2902 struct peer_req *pr;
2905 struct xseg_request *received;
2906 xseg_prepare_wait(xseg, portno_start);
2908 XSEGLOG2(&lc, D, "Attempting to check for reply");
2912 for (i = portno_start; i <= portno_end; i++) {
2913 received = xseg_receive(xseg, i, 0);
2916 r = xseg_get_req_data(xseg, received, (void **) &pr);
2917 if (r < 0 || !pr || received != expected_req){
2918 XSEGLOG2(&lc, W, "Received request with no pr data\n");
2919 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
2921 XSEGLOG2(&lc, W, "Could not respond stale request");
2922 xseg_put_request(xseg, received, portno_start);
2925 xseg_signal(xseg, p);
2928 xseg_cancel_wait(xseg, portno_start);
2934 xseg_wait_signal(xseg, 1000000UL);
2939 void custom_peer_finalize(struct peerd *peer)
2941 struct mapperd *mapper = __get_mapperd(peer);
2942 struct peer_req *pr = alloc_peer_req(peer);
2944 XSEGLOG2(&lc, E, "Cannot get peer request");
2948 struct xseg_request *req;
2951 xhash_iter_init(mapper->hashmaps, &it);
2952 while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
2953 map = (struct map *)val;
2954 if (!(map->flags & MF_MAP_EXCLUSIVE))
2956 req = __close_map(pr, map);
2959 wait_reply(peer, req);
2960 if (!(req->state & XS_SERVED))
2961 XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
2962 map->flags &= ~MF_MAP_CLOSING;
2963 xseg_put_request(peer->xseg, req, pr->portno);
2970 void print_obj(struct map_node *mn)
2972 fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n",
2973 (unsigned long long) mn->objectidx, mn->object,
2974 (unsigned int) mn->objectlen,
2975 (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2978 void print_map(struct map *m)
2980 uint64_t nr_objs = m->size/block_size;
2981 if (m->size % block_size)
2983 fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n",
2984 m->volume, m->volumelen,
2985 (unsigned long long) m->size,
2986 (unsigned long long) nr_objs,
2989 struct map_node *mn;
2990 if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2992 for (i = 0; i < nr_objs; i++) {
2993 mn = find_object(m, i);
2995 printf("object idx [%llu] not found!\n", (unsigned long long) i);
3003 void test_map(struct peerd *peer)
3006 //struct sha256_ctx sha256ctx;
3007 unsigned char buf[SHA256_DIGEST_SIZE];
3008 char buf_new[XSEG_MAX_TARGETLEN + 20];
3009 struct map *m = malloc(sizeof(struct map));
3010 strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
3011 m->volume[XSEG_MAX_TARGETLEN] = 0;
3012 strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
3013 buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
3014 m->volumelen = XSEG_MAX_TARGETLEN;
3015 m->size = 100*block_size;
3016 m->objects = xhash_new(3, INTEGER);
3017 struct map_node *map_node = calloc(100, sizeof(struct map_node));
3018 for (i = 0; i < 100; i++) {
3019 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
3020 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
3022 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
3023 sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
3025 map_node[i].objectidx = i;
3026 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
3027 map_node[i].flags = MF_OBJECT_EXIST;
3028 ret = insert_object(m, &map_node[i]);
3031 char *data = malloc(block_size);
3032 mapheader_to_map(m, data);
3033 uint64_t pos = mapheader_size;
3035 for (i = 0; i < 100; i++) {
3036 map_node = find_object(m, i);
3038 printf("no object node %d \n", i);
3041 object_to_map(data+pos, map_node);
3042 pos += objectsize_in_map;
3046 struct map *m2 = malloc(sizeof(struct map));
3047 strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
3048 m->volume[XSEG_MAX_TARGETLEN] = 0;
3049 m->volumelen = XSEG_MAX_TARGETLEN;
3051 m2->objects = xhash_new(3, INTEGER);
3052 ret = read_map(peer, m2, data);
3055 int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
3057 while (sum < block_size) {
3058 r = write(fd, data + sum, block_size -sum);
3061 printf("write error\n");
3067 map_node = find_object(m, 0);