2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
37 #include <sys/types.h>
39 #include <xseg/xseg.h>
42 #include <xtypes/xlock.h>
43 #include <xtypes/xhash.h>
44 #include <xseg/protocol.h>
49 #include <sys/syscall.h>
50 #include <openssl/sha.h>
53 /* general mapper flags */
54 #define MF_LOAD (1 << 0)
55 #define MF_EXCLUSIVE (1 << 1)
56 #define MF_FORCE (1 << 2)
57 #define MF_ARCHIP (1 << 3)
59 #ifndef SHA256_DIGEST_SIZE
60 #define SHA256_DIGEST_SIZE 32
62 /* hex representation of sha256 value takes up double the sha256 size */
63 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
65 #define block_size (1<<22) //FIXME this should be defined here?
67 /* transparency byte + max object len in disk */
68 #define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
70 /* Map header contains:
74 #define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
77 #define MAPPER_PREFIX "archip_"
78 #define MAPPER_PREFIX_LEN 7
80 #define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
81 #define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
83 #if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
84 #error "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
87 #define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
89 #if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
90 #error "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
93 #define MAX_VOLUME_SIZE \
94 ((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
97 //char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
99 /* pithos considers this a block full of zeros, so should we.
100 * it is actually the sha256 hash of nothing.
102 char *zero_block="e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
103 #define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
105 /* dispatch_internal mapper states */
114 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
117 /* mapper object flags */
118 #define MF_OBJECT_EXIST (1 << 0)
119 #define MF_OBJECT_COPYING (1 << 1)
120 #define MF_OBJECT_WRITING (1 << 2)
121 #define MF_OBJECT_DELETING (1 << 3)
122 #define MF_OBJECT_DESTROYED (1 << 5)
123 #define MF_OBJECT_SNAPSHOTTING (1 << 6)
125 #define MF_OBJECT_NOT_READY (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
126 MF_OBJECT_DELETING|MF_OBJECT_SNAPSHOTTING)
131 char object[MAX_OBJECT_LEN + 1]; /* NULL terminated string */
139 #define wait_on_pr(__pr, __condition__) \
140 while (__condition__){ \
142 __get_mapper_io(pr)->active = 0;\
143 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u", pr, ta); \
144 st_cond_wait(__pr->cond); \
147 #define wait_on_mapnode(__mn, __condition__) \
148 while (__condition__){ \
151 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
152 ta: %u", __mn, __mn->object, __mn->waiters, ta); \
153 st_cond_wait(__mn->cond); \
156 #define wait_on_map(__map, __condition__) \
157 while (__condition__){ \
160 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
161 __map, __map->volume, __map->waiters, ta); \
162 st_cond_wait(__map->cond); \
165 #define signal_pr(__pr) \
167 if (!__get_mapper_io(pr)->active){\
169 XSEGLOG2(&lc, D, "Signaling pr %lx, ta: %u", pr, ta);\
170 __get_mapper_io(pr)->active = 1;\
171 st_cond_signal(__pr->cond); \
175 #define signal_map(__map) \
177 if (__map->waiters) { \
179 XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
180 ta: %u", __map, __map->volume, __map->waiters, ta); \
182 st_cond_signal(__map->cond); \
186 #define signal_mapnode(__mn) \
188 if (__mn->waiters) { \
189 ta += __mn->waiters; \
190 XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
191 %u, ta: %u", __mn, __mn->object, __mn->waiters, ta); \
193 st_cond_broadcast(__mn->cond); \
199 #define MF_MAP_LOADING (1 << 0)
200 #define MF_MAP_DESTROYED (1 << 1)
201 #define MF_MAP_WRITING (1 << 2)
202 #define MF_MAP_DELETING (1 << 3)
203 #define MF_MAP_DROPPING_CACHE (1 << 4)
204 #define MF_MAP_EXCLUSIVE (1 << 5)
205 #define MF_MAP_OPENING (1 << 6)
206 #define MF_MAP_CLOSING (1 << 7)
207 #define MF_MAP_DELETED (1 << 8)
208 #define MF_MAP_SNAPSHOTTING (1 << 9)
210 #define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
211 MF_MAP_DROPPING_CACHE|MF_MAP_OPENING| \
219 char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
220 xhash_t *objects; /* obj_index --> map_node */
227 xport bportno; /* blocker that accesses data */
228 xport mbportno; /* blocker that accesses maps */
229 xhash_t *hashmaps; // hash_function(target) --> struct map
233 volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */
234 xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
235 struct map_node *copyup_node;
236 volatile int err; /* error flag */
237 volatile uint64_t del_pending;
238 volatile uint64_t snap_pending;
242 enum mapper_state state;
247 struct mapperd *mapper;
249 void print_map(struct map *m);
252 void custom_peer_usage()
254 fprintf(stderr, "Custom peer options: \n"
255 "-bp : port for block blocker(!)\n"
256 "-mbp : port for map blocker\n"
265 static inline struct mapperd * __get_mapperd(struct peerd *peer)
267 return (struct mapperd *) peer->priv;
270 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
272 return (struct mapper_io *) pr->priv;
275 static inline uint64_t calc_map_obj(struct map *map)
279 uint64_t nr_objs = map->size / block_size;
280 if (map->size % block_size)
285 static uint32_t calc_nr_obj(struct xseg_request *req)
288 uint64_t rem_size = req->size;
289 uint64_t obj_offset = req->offset & (block_size -1); //modulo
290 uint64_t obj_size = (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
291 rem_size -= obj_size;
292 while (rem_size > 0) {
293 obj_size = (rem_size > block_size) ? block_size : rem_size;
294 rem_size -= obj_size;
302 * Unsafe. Doesn't check if data length is odd!
304 static void hexlify(unsigned char *data, char *hex)
307 for (i=0; i<SHA256_DIGEST_LENGTH; i++)
308 sprintf(hex+2*i, "%02x", data[i]);
311 static void unhexlify(char *hex, unsigned char *data)
315 for (i=0; i<SHA256_DIGEST_LENGTH; i++){
330 data[i] |= (c << 4) & 0xF0;
348 * Maps handling functions
351 static struct map * find_map(struct mapperd *mapper, char *volume)
353 struct map *m = NULL;
354 int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
361 static struct map * find_map_len(struct mapperd *mapper, char *target,
362 uint32_t targetlen, uint32_t flags)
364 char buf[XSEG_MAX_TARGETLEN+1];
365 if (flags & MF_ARCHIP){
366 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
367 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
368 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
369 targetlen += MAPPER_PREFIX_LEN;
372 strncpy(buf, target, targetlen);
376 if (targetlen > MAX_VOLUME_LEN){
377 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
378 targetlen, MAX_VOLUME_LEN);
382 XSEGLOG2(&lc, D, "looking up map %s, len %u",
384 return find_map(mapper, buf);
388 static int insert_map(struct mapperd *mapper, struct map *map)
392 if (find_map(mapper, map->volume)){
393 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
397 XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)",
398 map->volume, strlen(map->volume), (unsigned long) map);
399 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
400 while (r == -XHASH_ERESIZE) {
401 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
402 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
404 XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
405 (unsigned long long) shift);
408 mapper->hashmaps = new_hashmap;
409 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
415 static int remove_map(struct mapperd *mapper, struct map *map)
419 //assert no pending pr on map
421 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
422 while (r == -XHASH_ERESIZE) {
423 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
424 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
426 XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
427 (unsigned long long) shift);
430 mapper->hashmaps = new_hashmap;
431 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
437 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
441 struct peerd *peer = pr->peer;
442 struct xseg_request *req;
443 struct mapperd *mapper = __get_mapperd(peer);
446 XSEGLOG2(&lc, I, "Closing map %s", map->volume);
448 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
450 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
455 r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
457 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
462 char *reqtarget = xseg_get_target(peer->xseg, req);
465 strncpy(reqtarget, map->volume, req->targetlen);
469 r = xseg_set_req_data(peer->xseg, req, pr);
471 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
475 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
477 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
481 r = xseg_signal(peer->xseg, p);
482 map->flags |= MF_MAP_CLOSING;
484 XSEGLOG2(&lc, I, "Map %s closing", map->volume);
488 xseg_get_req_data(peer->xseg, req, &dummy);
490 xseg_put_request(peer->xseg, req, pr->portno);
495 static int close_map(struct peer_req *pr, struct map *map)
498 struct xseg_request *req;
499 struct peerd *peer = pr->peer;
501 req = __close_map(pr, map);
504 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
505 map->flags &= ~MF_MAP_CLOSING;
506 err = req->state & XS_FAILED;
507 xseg_put_request(peer->xseg, req, pr->portno);
514 static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
515 char *target, uint32_t targetlen, struct map **m)
517 struct mapperd *mapper = __get_mapperd(peer);
519 *m = find_map(mapper, target, targetlen);
521 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
522 if ((*m)->flags & MF_MAP_NOT_READY) {
523 __xq_append_tail(&(*m)->pending, (xqindex) pr);
524 XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
526 //} else if ((*m)->flags & MF_MAP_DESTROYED){
530 XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
534 r = open_map(peer, pr, target, targetlen, 0);
541 * Object handling functions
544 struct map_node *find_object(struct map *map, uint64_t obj_index)
547 int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
553 static int insert_object(struct map *map, struct map_node *mn)
555 //FIXME no find object first
556 int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
557 if (r == -XHASH_ERESIZE) {
558 unsigned long shift = xhash_grow_size_shift(map->objects);
559 map->objects = xhash_resize(map->objects, shift, NULL);
562 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
569 * map read/write functions
571 * version 0 -> pithos map
572 * version 1 -> archipelago version 1
576 * int read_object(struct map_node *mn, unsigned char *buf)
577 * int prepare_write_object(struct peer_req *pr, struct map *map,
578 * struct map_node *mn, struct xseg_request *req)
579 * int read_map(struct map *m, unsigned char * data)
580 * int prepare_write_map(struct peer_req *pr, struct map *map,
581 * struct xseg_request *req)
584 struct map_functions {
585 int (*read_object)(struct map_node *mn, unsigned char *buf);
586 int (*prepare_write_object)(struct peer_req *pr, struct map *map,
587 struct map_node *mn, struct xseg_request *req);
588 int (*read_map)(struct map *m, unsigned char * data);
589 int (*prepare_write_map)(struct peer_req *pr, struct map *map,
590 struct xseg_request *req);
593 /* version 0 functions */
596 #define v0_mapheader_size 0
597 /* just the unhexlified name */
598 #define v0_objectsize_in_map SHA256_DIGEST_SIZE
600 static inline int read_object_v0(struct map_node *mn, unsigned char *buf)
602 hexlify(buf, mn->object);
603 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
604 mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
605 mn->flags = MF_OBJECT_EXIST;
610 static void v0_object_to_map(struct map_node *mn, unsigned char *data)
612 unhexlify(mn->object, data);
615 static int prepare_write_object_v0(struct peer_req *pr, struct map *map,
616 struct map_node *mn, struct xseg_request *req)
618 struct peerd *peer = pr->peer;
619 int r = xseg_prep_request(peer->xseg, req, map->volumelen, v0_objectsize_in_map);
621 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
623 mn->object, map->volume, (unsigned long long) mn->objectidx);
626 char *target = xseg_get_target(peer->xseg, req);
627 strncpy(target, map->volume, req->targetlen);
628 req->size = req->datalen;
629 req->offset = v0_mapheader_size + mn->objectidx * v0_objectsize_in_map;
631 unsigned char *data = xseg_get_data(pr->peer->xseg, req);
632 v0_object_to_map(mn, data);
636 static int read_map_v0(struct map *m, unsigned char * data)
639 struct map_node *map_node;
642 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
643 XSEGLOG2(&lc, D, "Max nr_objs %llu", max_nr_objs);
644 char nulls[SHA256_DIGEST_SIZE];
645 memset(nulls, 0, SHA256_DIGEST_SIZE);
646 map_node = calloc(max_nr_objs, sizeof(struct map_node));
649 for (i = 0; i < max_nr_objs; i++) {
650 if (!memcmp(data+pos, nulls, v0_objectsize_in_map))
652 map_node[i].objectidx = i;
654 map_node[i].waiters = 0;
656 map_node[i].cond = st_cond_new(); //FIXME err check;
657 read_object_v0(&map_node[i], data+pos);
658 pos += v0_objectsize_in_map;
659 r = insert_object(m, &map_node[i]); //FIXME error check
661 XSEGLOG2(&lc, D, "Found %llu objects", i);
662 m->size = i * block_size;
666 static int prepare_write_map_v0(struct peer_req *pr, struct map *map,
667 struct xseg_request *req)
669 struct peerd *peer = pr->peer;
670 uint64_t i, pos = 0, max_objidx = calc_map_obj(map);
672 int r = xseg_prep_request(peer->xseg, req, map->volumelen,
673 v0_mapheader_size + max_objidx * v0_objectsize_in_map);
675 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
678 char *target = xseg_get_target(peer->xseg, req);
679 strncpy(target, map->volume, req->targetlen);
680 char *data = xseg_get_data(peer->xseg, req);
683 req->size = req->datalen;
686 for (i = 0; i < max_objidx; i++) {
687 mn = find_object(map, i);
689 XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
690 (unsigned long long) i, map->volume);
693 v0_object_to_map(mn, (unsigned char *)(data+pos));
694 pos += v0_objectsize_in_map;
696 XSEGLOG2(&lc, D, "Prepared %llu objects", i);
700 /* static struct map_functions map_functions_v0 = { */
701 /* .read_object = read_object_v0, */
702 /* .read_map = read_map_v0, */
703 /* .prepare_write_object = prepare_write_object_v0, */
704 /* .prepare_write_map = prepare_write_map_v0 */
706 #define map_functions_v0 { \
707 .read_object = read_object_v0, \
708 .read_map = read_map_v0, \
709 .prepare_write_object = prepare_write_object_v0,\
710 .prepare_write_map = prepare_write_map_v0 \
714 /* transparency byte + max object len in disk */
715 #define v1_objectsize_in_map (1 + SHA256_DIGEST_SIZE)
717 /* Map header contains:
721 #define v1_mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
723 static inline int read_object_v1(struct map_node *mn, unsigned char *buf)
728 mn->flags |= MF_OBJECT_EXIST;
729 strcpy(mn->object, MAPPER_PREFIX);
730 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
731 mn->object[MAX_OBJECT_LEN] = 0;
732 mn->objectlen = strlen(mn->object);
735 mn->flags &= ~MF_OBJECT_EXIST;
736 hexlify(buf+1, mn->object);
737 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
738 mn->objectlen = strlen(mn->object);
743 static inline void v1_object_to_map(char* buf, struct map_node *mn)
745 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
747 /* strip common prefix */
748 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
751 unhexlify(mn->object, (unsigned char *)(buf+1));
755 static int prepare_write_object_v1(struct peer_req *pr, struct map *map,
756 struct map_node *mn, struct xseg_request *req)
758 struct peerd *peer = pr->peer;
759 int r = xseg_prep_request(peer->xseg, req, map->volumelen, v1_objectsize_in_map);
761 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
763 mn->object, map->volume, (unsigned long long) mn->objectidx);
766 char *target = xseg_get_target(peer->xseg, req);
767 strncpy(target, map->volume, req->targetlen);
768 req->size = req->datalen;
769 req->offset = v1_mapheader_size + mn->objectidx * v1_objectsize_in_map;
771 char *data = xseg_get_data(pr->peer->xseg, req);
772 v1_object_to_map(data, mn);
776 static int read_map_v1(struct map *m, unsigned char * data)
779 struct map_node *map_node;
785 m->version = *(uint32_t *) (data + pos);
786 pos += sizeof(uint32_t);
787 m->size = *(uint64_t *) (data + pos);
788 pos += sizeof(uint64_t);
791 nr_objs = m->size / block_size;
792 if (m->size % block_size)
794 map_node = calloc(nr_objs, sizeof(struct map_node));
798 for (i = 0; i < nr_objs; i++) {
800 map_node[i].objectidx = i;
801 map_node[i].waiters = 0;
803 map_node[i].cond = st_cond_new(); //FIXME err check;
804 read_object_v1(&map_node[i], data+pos);
805 pos += objectsize_in_map;
806 r = insert_object(m, &map_node[i]); //FIXME error check
811 static int prepare_write_map_v1(struct peer_req *pr, struct map *m,
812 struct xseg_request *req)
814 struct peerd *peer = pr->peer;
815 uint64_t i, pos = 0, max_objidx = calc_map_obj(m);
818 int r = xseg_prep_request(peer->xseg, req, m->volumelen,
819 v1_mapheader_size + max_objidx * v1_objectsize_in_map);
821 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", m->volume);
824 char *target = xseg_get_target(peer->xseg, req);
825 strncpy(target, m->volume, req->targetlen);
826 char *data = xseg_get_data(peer->xseg, req);
828 memcpy(data + pos, &m->version, sizeof(m->version));
829 pos += sizeof(m->version);
830 memcpy(data + pos, &m->size, sizeof(m->size));
831 pos += sizeof(m->size);
834 req->size = req->datalen;
837 for (i = 0; i < max_objidx; i++) {
838 mn = find_object(m, i);
840 XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
841 (unsigned long long) i, m->volume);
844 v1_object_to_map(data+pos, mn);
845 pos += v1_objectsize_in_map;
850 /* static struct map_functions map_functions_v1 = { */
851 /* .read_object = read_object_v1, */
852 /* .read_map = read_map_v1, */
853 /* .prepare_write_object = prepare_write_object_v1, */
854 /* .prepare_write_map = prepare_write_map_v1 */
856 #define map_functions_v1 { \
857 .read_object = read_object_v1, \
858 .read_map = read_map_v1, \
859 .prepare_write_object = prepare_write_object_v1,\
860 .prepare_write_map = prepare_write_map_v1 \
863 static struct map_functions map_functions[] = { map_functions_v0,
865 #define MAP_LATEST_VERSION 1
866 /* end of functions */
872 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
874 hexlify(buf, mn->object);
875 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
876 mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
877 mn->flags = MF_OBJECT_EXIST;
880 static inline void map_to_object(struct map_node *mn, unsigned char *buf)
885 mn->flags |= MF_OBJECT_EXIST;
886 strcpy(mn->object, MAPPER_PREFIX);
887 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
888 mn->object[MAX_OBJECT_LEN] = 0;
889 mn->objectlen = strlen(mn->object);
892 hexlify(buf+1, mn->object);
893 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
894 mn->objectlen = strlen(mn->object);
899 static inline void object_to_map(char* buf, struct map_node *mn)
901 buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
903 /* strip common prefix */
904 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
907 unhexlify(mn->object, (unsigned char *)(buf+1));
911 static inline void mapheader_to_map(struct map *m, char *buf)
914 memcpy(buf + pos, &m->version, sizeof(m->version));
915 pos += sizeof(m->version);
916 memcpy(buf + pos, &m->size, sizeof(m->size));
917 pos += sizeof(m->size);
921 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
922 struct map *map, struct map_node *mn)
926 struct mapperd *mapper = __get_mapperd(peer);
927 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
928 mapper->mbportno, X_ALLOC);
930 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
932 mn->object, map->volume, (unsigned long long) mn->objectidx);
936 r = map_functions[map->version].prepare_write_object(pr, map, mn, req);
938 XSEGLOG2(&lc, E, "Cannot prepare write object");
943 r = xseg_set_req_data(peer->xseg, req, pr);
945 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
947 mn->object, map->volume, (unsigned long long) mn->objectidx);
950 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
952 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
954 mn->object, map->volume, (unsigned long long) mn->objectidx);
957 r = xseg_signal(peer->xseg, p);
959 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
961 XSEGLOG2(&lc, I, "Writing object %s \n\t"
963 mn->object, map->volume, (unsigned long long) mn->objectidx);
968 xseg_get_req_data(peer->xseg, req, &dummy);
970 xseg_put_request(peer->xseg, req, pr->portno);
972 XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
974 mn->object, map->volume, (unsigned long long) mn->objectidx);
978 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
982 struct peerd *peer = pr->peer;
983 struct mapperd *mapper = __get_mapperd(peer);
984 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
985 mapper->mbportno, X_ALLOC);
987 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
991 r = map_functions[map->version].prepare_write_map(pr, map, req);
993 XSEGLOG2(&lc, E, "Cannot prepare write map");
999 r = xseg_set_req_data(peer->xseg, req, pr);
1001 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1005 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1007 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1011 r = xseg_signal(peer->xseg, p);
1013 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
1015 map->flags |= MF_MAP_WRITING;
1016 XSEGLOG2(&lc, I, "Writing map %s", map->volume);
1020 xseg_get_req_data(peer->xseg, req, &dummy);
1022 xseg_put_request(peer->xseg, req, pr->portno);
1024 XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
1028 static int write_map(struct peer_req* pr, struct map *map)
1031 struct peerd *peer = pr->peer;
1032 struct xseg_request *req = __write_map(pr, map);
1035 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1036 if (req->state & XS_FAILED)
1038 xseg_put_request(peer->xseg, req, pr->portno);
1039 map->flags &= ~MF_MAP_WRITING;
1043 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
1047 struct xseg_request *req;
1048 struct peerd *peer = pr->peer;
1049 struct mapperd *mapper = __get_mapperd(peer);
1052 XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
1054 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1056 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1061 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1063 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1068 char *reqtarget = xseg_get_target(peer->xseg, req);
1071 strncpy(reqtarget, m->volume, req->targetlen);
1073 req->size = block_size;
1075 r = xseg_set_req_data(peer->xseg, req, pr);
1077 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1081 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1083 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1087 r = xseg_signal(peer->xseg, p);
1089 m->flags |= MF_MAP_LOADING;
1090 XSEGLOG2(&lc, I, "Map %s loading", m->volume);
1094 xseg_get_req_data(peer->xseg, req, &dummy);
1096 xseg_put_request(peer->xseg, req, pr->portno);
1101 static int read_map (struct map *map, unsigned char *buf)
1103 char nulls[SHA256_DIGEST_SIZE];
1104 memset(nulls, 0, SHA256_DIGEST_SIZE);
1106 int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
1108 XSEGLOG2(&lc, E, "Read zeros");
1111 //type 1, archip type, type 0 pithos map
1112 int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1113 XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
1116 version = *(uint32_t *) (buf); //version should always be the first uint32_t
1119 if (version > MAP_LATEST_VERSION){
1120 XSEGLOG2(&lc, E, "Map read for map %s failed. Invalid version %u",
1121 map->volume, version);
1125 r = map_functions[version].read_map(map, buf);
1127 XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
1132 XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
1137 static int load_map(struct peer_req *pr, struct map *map)
1140 struct xseg_request *req;
1141 struct peerd *peer = pr->peer;
1142 req = __load_map(pr, map);
1145 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1146 map->flags &= ~MF_MAP_LOADING;
1147 if (req->state & XS_FAILED){
1148 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
1149 xseg_put_request(peer->xseg, req, pr->portno);
1152 r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
1153 xseg_put_request(peer->xseg, req, pr->portno);
1157 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
1162 struct xseg_request *req;
1163 struct peerd *peer = pr->peer;
1164 struct mapperd *mapper = __get_mapperd(peer);
1167 XSEGLOG2(&lc, I, "Opening map %s", m->volume);
1169 req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1171 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1176 r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1178 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1183 char *reqtarget = xseg_get_target(peer->xseg, req);
1186 strncpy(reqtarget, m->volume, req->targetlen);
1187 req->op = X_ACQUIRE;
1188 req->size = block_size;
1190 if (!(flags & MF_FORCE))
1191 req->flags = XF_NOSYNC;
1192 r = xseg_set_req_data(peer->xseg, req, pr);
1194 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1198 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1200 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1204 r = xseg_signal(peer->xseg, p);
1206 m->flags |= MF_MAP_OPENING;
1207 XSEGLOG2(&lc, I, "Map %s opening", m->volume);
1211 xseg_get_req_data(peer->xseg, req, &dummy);
1213 xseg_put_request(peer->xseg, req, pr->portno);
1218 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
1221 struct xseg_request *req;
1222 struct peerd *peer = pr->peer;
1224 req = __open_map(pr, map, flags);
1228 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1229 map->flags &= ~MF_MAP_OPENING;
1230 err = req->state & XS_FAILED;
1231 xseg_put_request(peer->xseg, req, pr->portno);
1235 map->flags |= MF_MAP_EXCLUSIVE;
1243 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
1247 XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
1249 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1250 if (r == -XHASH_ERESIZE) {
1251 xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
1252 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1255 mio->copyups_nodes = new_hashmap;
1256 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1259 XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
1263 XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
1265 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1266 if (r == -XHASH_ERESIZE) {
1267 xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
1268 xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1271 mio->copyups_nodes = new_hashmap;
1272 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1275 XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
1282 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
1284 struct map_node *mn;
1285 int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
1287 XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
1290 XSEGLOG2(&lc, D, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
1294 static struct xseg_request * __snapshot_object(struct peer_req *pr,
1295 struct map_node *mn)
1297 struct peerd *peer = pr->peer;
1298 struct mapperd *mapper = __get_mapperd(peer);
1299 struct mapper_io *mio = __get_mapper_io(pr);
1300 //struct map *map = mn->map;
1305 //assert mn->volume != zero_block
1306 //assert mn->flags & MF_OBJECT_EXIST
1307 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1308 mapper->bportno, X_ALLOC);
1310 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1313 r = xseg_prep_request(peer->xseg, req, mn->objectlen,
1314 sizeof(struct xseg_request_snapshot));
1316 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1320 char *target = xseg_get_target(peer->xseg, req);
1321 strncpy(target, mn->object, req->targetlen);
1323 struct xseg_request_snapshot *xsnapshot = (struct xseg_request_snapshot *) xseg_get_data(peer->xseg, req);
1324 xsnapshot->target[0] = 0;
1325 xsnapshot->targetlen = 0;
1328 req->size = block_size;
1329 req->op = X_SNAPSHOT;
1330 r = xseg_set_req_data(peer->xseg, req, pr);
1332 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1335 r = __set_copyup_node(mio, req, mn);
1338 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1340 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1341 goto out_mapper_unset;
1343 xseg_signal(peer->xseg, p);
1345 mn->flags |= MF_OBJECT_SNAPSHOTTING;
1346 XSEGLOG2(&lc, I, "Snapshotting up object %s", mn->object);
1350 __set_copyup_node(mio, req, NULL);
1352 xseg_get_req_data(peer->xseg, req, &dummy);
1354 xseg_put_request(peer->xseg, req, pr->portno);
1356 XSEGLOG2(&lc, E, "Snapshotting object %s failed", mn->object);
1360 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
1362 struct mapperd *mapper = __get_mapperd(peer);
1363 struct mapper_io *mio = __get_mapper_io(pr);
1364 struct map *map = mn->map;
1369 uint32_t newtargetlen;
1370 char new_target[MAX_OBJECT_LEN + 1];
1371 unsigned char sha[SHA256_DIGEST_SIZE];
1373 strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1375 char tmp[XSEG_MAX_TARGETLEN + 1];
1377 strncpy(tmp, map->volume, map->volumelen);
1378 sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1379 tmp[XSEG_MAX_TARGETLEN] = 0;
1380 tmplen = strlen(tmp);
1381 SHA256((unsigned char *)tmp, tmplen, sha);
1382 hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1383 newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1386 if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1387 goto copyup_zeroblock;
1389 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1390 mapper->bportno, X_ALLOC);
1392 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1395 r = xseg_prep_request(peer->xseg, req, newtargetlen,
1396 sizeof(struct xseg_request_copy));
1398 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1402 char *target = xseg_get_target(peer->xseg, req);
1403 strncpy(target, new_target, req->targetlen);
1405 struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1406 strncpy(xcopy->target, mn->object, mn->objectlen);
1407 xcopy->targetlen = mn->objectlen;
1410 req->size = block_size;
1412 r = xseg_set_req_data(peer->xseg, req, pr);
1414 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1417 r = __set_copyup_node(mio, req, mn);
1420 p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1422 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1423 goto out_mapper_unset;
1425 xseg_signal(peer->xseg, p);
1428 mn->flags |= MF_OBJECT_COPYING;
1429 XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1433 __set_copyup_node(mio, req, NULL);
1435 xseg_get_req_data(peer->xseg, req, &dummy);
1437 xseg_put_request(peer->xseg, req, pr->portno);
1439 XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1443 XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1444 "Proceeding in writing the new object in map");
1445 /* construct a tmp map_node for writing purposes */
1446 struct map_node newmn = *mn;
1447 newmn.flags = MF_OBJECT_EXIST;
1448 strncpy(newmn.object, new_target, newtargetlen);
1449 newmn.object[newtargetlen] = 0;
1450 newmn.objectlen = newtargetlen;
1451 newmn.objectidx = mn->objectidx;
1452 req = object_write(peer, pr, map, &newmn);
1453 r = __set_copyup_node(mio, req, mn);
1457 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1458 "\n\t of map %s [%llu]",
1459 mn->object, map->volume, (unsigned long long) mn->objectidx);
1462 mn->flags |= MF_OBJECT_WRITING;
1463 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1467 static struct xseg_request * __delete_object(struct peer_req *pr, struct map_node *mn)
1470 struct peerd *peer = pr->peer;
1471 struct mapperd *mapper = __get_mapperd(peer);
1472 struct mapper_io *mio = __get_mapper_io(pr);
1473 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1474 mapper->bportno, X_ALLOC);
1475 XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1477 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1480 int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1482 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1485 char *target = xseg_get_target(peer->xseg, req);
1486 strncpy(target, mn->object, req->targetlen);
1488 req->size = req->datalen;
1490 r = xseg_set_req_data(peer->xseg, req, pr);
1492 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1495 r = __set_copyup_node(mio, req, mn);
1498 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1500 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1501 goto out_mapper_unset;
1503 r = xseg_signal(peer->xseg, p);
1504 mn->flags |= MF_OBJECT_DELETING;
1505 XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1509 __set_copyup_node(mio, req, NULL);
1511 xseg_get_req_data(peer->xseg, req, &dummy);
1513 xseg_put_request(peer->xseg, req, pr->portno);
1515 XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1519 static struct xseg_request * __delete_map(struct peer_req *pr, struct map *map)
1522 struct peerd *peer = pr->peer;
1523 struct mapperd *mapper = __get_mapperd(peer);
1524 struct mapper_io *mio = __get_mapper_io(pr);
1525 struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1526 mapper->mbportno, X_ALLOC);
1527 XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1529 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1532 int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1534 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1537 char *target = xseg_get_target(peer->xseg, req);
1538 strncpy(target, map->volume, req->targetlen);
1540 req->size = req->datalen;
1542 r = xseg_set_req_data(peer->xseg, req, pr);
1544 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1547 /* do not check return value. just make sure there is no node set */
1548 __set_copyup_node(mio, req, NULL);
1549 xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1551 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1554 r = xseg_signal(peer->xseg, p);
1555 map->flags |= MF_MAP_DELETING;
1556 XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1560 xseg_get_req_data(peer->xseg, req, &dummy);
1562 xseg_put_request(peer->xseg, req, pr->portno);
1564 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1569 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1571 struct map_node *mn = find_object(map, index);
1577 static inline void put_mapnode(struct map_node *mn)
1582 st_cond_destroy(mn->cond);
1586 static inline void __get_map(struct map *map)
1591 static inline void put_map(struct map *map)
1593 struct map_node *mn;
1596 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1599 for (i = 0; i < calc_map_obj(map); i++) {
1600 mn = get_mapnode(map, i);
1602 //make sure all pending operations on all objects are completed
1603 //this should never happen...
1604 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1605 mn->flags |= MF_OBJECT_DESTROYED;
1606 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1607 put_mapnode(mn); //matcing get_mapnode;
1608 //assert mn->ref == 0;
1611 mn = find_object(map, 0);
1614 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1619 static struct map * create_map(struct mapperd *mapper, char *name,
1620 uint32_t namelen, uint32_t flags)
1623 if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1624 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1625 namelen, MAX_VOLUME_LEN);
1628 struct map *m = malloc(sizeof(struct map));
1630 XSEGLOG2(&lc, E, "Cannot allocate map ");
1634 if (flags & MF_ARCHIP){
1635 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1636 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1637 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1638 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1639 m->version = 1; /* keep this hardcoded for now */
1642 strncpy(m->volume, name, namelen);
1643 m->volume[namelen] = 0;
1644 m->volumelen = namelen;
1645 m->version = 0; /* version 0 should be pithos maps */
1648 m->objects = xhash_new(3, INTEGER);
1650 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1656 m->cond = st_cond_new(); //FIXME err check;
1657 r = insert_map(mapper, m);
1659 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1666 xhash_free(m->objects);
1668 XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1676 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1678 struct peerd *peer = pr->peer;
1679 struct mapperd *mapper = __get_mapperd(peer);
1681 struct mapper_io *mio = __get_mapper_io(pr);
1682 struct map_node *mn = __get_copyup_node(mio, req);
1684 __set_copyup_node(mio, req, NULL);
1686 //assert req->op = X_DELETE;
1687 //assert pr->req->op = X_DELETE only map deletions make delete requests
1688 //assert mio->del_pending > 0
1689 XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
1692 if (req->state & XS_FAILED){
1696 XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
1697 mn, mn->object, mio, req);
1698 // assert mn->flags & MF_OBJECT_DELETING
1699 mn->flags &= ~MF_OBJECT_DELETING;
1700 mn->flags |= MF_OBJECT_DESTROYED;
1702 /* put mapnode here, matches get_mapnode on do_destroy */
1705 XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
1708 xseg_put_request(peer->xseg, req, pr->portno);
1712 void snapshot_cb(struct peer_req *pr, struct xseg_request *req)
1714 struct peerd *peer = pr->peer;
1715 struct mapperd *mapper = __get_mapperd(peer);
1717 struct mapper_io *mio = __get_mapper_io(pr);
1718 struct map_node *mn = __get_copyup_node(mio, req);
1720 XSEGLOG2(&lc, E, "Cannot get map node");
1723 __set_copyup_node(mio, req, NULL);
1725 if (req->state & XS_FAILED){
1726 if (req->op == X_DELETE){
1727 XSEGLOG2(&lc, E, "Delete req failed");
1730 XSEGLOG2(&lc, E, "Req failed");
1731 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1732 mn->flags &= ~MF_OBJECT_WRITING;
1736 if (req->op == X_WRITE) {
1737 char old_object_name[MAX_OBJECT_LEN + 1];
1738 uint32_t old_objectlen;
1740 char *target = xseg_get_target(peer->xseg, req);
1742 //assert mn->flags & MF_OBJECT_WRITING
1743 mn->flags &= ~MF_OBJECT_WRITING;
1744 strncpy(old_object_name, mn->object, mn->objectlen);
1745 old_objectlen = mn->objectlen;
1747 struct map_node tmp;
1748 char *data = xseg_get_data(peer->xseg, req);
1749 map_to_object(&tmp, (unsigned char *) data);
1750 mn->flags &= ~MF_OBJECT_EXIST;
1752 strncpy(mn->object, tmp.object, tmp.objectlen);
1753 mn->object[tmp.objectlen] = 0;
1754 mn->objectlen = tmp.objectlen;
1755 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1756 //signal_mapnode since Snapshot was successfull
1759 //do delete old object
1760 strncpy(tmp.object, old_object_name, old_objectlen);
1761 tmp.object[old_objectlen] = 0;
1762 tmp.objectlen = old_objectlen;
1763 tmp.flags = MF_OBJECT_EXIST;
1764 struct xseg_request *xreq = __delete_object(pr, &tmp);
1766 //just a warning. Snapshot was successfull
1767 XSEGLOG2(&lc, W, "Cannot delete old object %s", tmp.object);
1770 //overwrite copyup node, since tmp is a stack dummy variable
1771 __set_copyup_node (mio, xreq, mn);
1772 XSEGLOG2(&lc, I, "Deletion of %s pending", tmp.object);
1775 } else if (req->op == X_SNAPSHOT) {
1776 //issue write_object;
1777 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1778 struct map *map = mn->map;
1780 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1784 /* construct a tmp map_node for writing purposes */
1785 //char *target = xseg_get_target(peer->xseg, req);
1786 struct map_node newmn = *mn;
1788 struct xseg_reply_snapshot *xreply;
1789 xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, req);
1790 //assert xreply->targetlen !=0
1791 //assert xreply->targetlen < XSEG_MAX_TARGETLEN
1792 //xreply->target[xreply->targetlen] = 0;
1793 //assert xreply->target valid
1794 strncpy(newmn.object, xreply->target, xreply->targetlen);
1795 newmn.object[req->targetlen] = 0;
1796 newmn.objectlen = req->targetlen;
1797 newmn.objectidx = mn->objectidx;
1798 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1800 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1801 "\n\t of map %s [%llu]",
1802 mn->object, map->volume, (unsigned long long) mn->objectidx);
1805 mn->flags |= MF_OBJECT_WRITING;
1806 __set_copyup_node (mio, xreq, mn);
1808 XSEGLOG2(&lc, I, "Object %s snapshot completed. Pending writing.", mn->object);
1809 } else if (req->op == X_DELETE){
1810 //deletion of the old block completed
1811 XSEGLOG2(&lc, I, "Deletion of completed");
1820 xseg_put_request(peer->xseg, req, pr->portno);
1824 mio->snap_pending--;
1825 XSEGLOG2(&lc, D, "Mio->snap_pending: %u", mio->snap_pending);
1833 mio->snap_pending--;
1839 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1841 struct peerd *peer = pr->peer;
1842 struct mapperd *mapper = __get_mapperd(peer);
1844 struct mapper_io *mio = __get_mapper_io(pr);
1845 struct map_node *mn = __get_copyup_node(mio, req);
1847 XSEGLOG2(&lc, E, "Cannot get map node");
1850 __set_copyup_node(mio, req, NULL);
1852 if (req->state & XS_FAILED){
1853 XSEGLOG2(&lc, E, "Req failed");
1854 mn->flags &= ~MF_OBJECT_COPYING;
1855 mn->flags &= ~MF_OBJECT_WRITING;
1858 if (req->op == X_WRITE) {
1859 char *target = xseg_get_target(peer->xseg, req);
1861 //printf("handle object write replyi\n");
1862 __set_copyup_node(mio, req, NULL);
1863 //assert mn->flags & MF_OBJECT_WRITING
1864 mn->flags &= ~MF_OBJECT_WRITING;
1866 struct map_node tmp;
1867 char *data = xseg_get_data(peer->xseg, req);
1868 map_to_object(&tmp, (unsigned char *) data);
1869 mn->flags |= MF_OBJECT_EXIST;
1870 if (mn->flags != MF_OBJECT_EXIST){
1871 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1874 //assert mn->flags & MF_OBJECT_EXIST
1875 strncpy(mn->object, tmp.object, tmp.objectlen);
1876 mn->object[tmp.objectlen] = 0;
1877 mn->objectlen = tmp.objectlen;
1878 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1882 } else if (req->op == X_COPY) {
1883 // issue write_object;
1884 mn->flags &= ~MF_OBJECT_COPYING;
1885 struct map *map = mn->map;
1887 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1891 /* construct a tmp map_node for writing purposes */
1892 char *target = xseg_get_target(peer->xseg, req);
1893 struct map_node newmn = *mn;
1894 newmn.flags = MF_OBJECT_EXIST;
1895 strncpy(newmn.object, target, req->targetlen);
1896 newmn.object[req->targetlen] = 0;
1897 newmn.objectlen = req->targetlen;
1898 newmn.objectidx = mn->objectidx;
1899 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1901 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1902 "\n\t of map %s [%llu]",
1903 mn->object, map->volume, (unsigned long long) mn->objectidx);
1906 mn->flags |= MF_OBJECT_WRITING;
1907 __set_copyup_node (mio, xreq, mn);
1909 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1916 xseg_put_request(peer->xseg, req, pr->portno);
1921 XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1931 struct map_node *mn;
1936 static int req2objs(struct peer_req *pr, struct map *map, int write)
1939 struct peerd *peer = pr->peer;
1940 struct mapper_io *mio = __get_mapper_io(pr);
1941 char *target = xseg_get_target(peer->xseg, pr->req);
1942 uint32_t nr_objs = calc_nr_obj(pr->req);
1943 uint64_t size = sizeof(struct xseg_reply_map) +
1944 nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1946 uint64_t rem_size, obj_index, obj_offset, obj_size;
1947 struct map_node *mn;
1949 XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1951 /* get map_nodes of request */
1952 struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1954 XSEGLOG2(&lc, E, "Cannot allocate mns");
1958 rem_size = pr->req->size;
1959 obj_index = pr->req->offset / block_size;
1960 obj_offset = pr->req->offset & (block_size -1); //modulo
1961 obj_size = (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1962 mn = get_mapnode(map, obj_index);
1964 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1969 mns[idx].offset = obj_offset;
1970 mns[idx].size = obj_size;
1971 rem_size -= obj_size;
1972 while (rem_size > 0) {
1976 obj_size = (rem_size > block_size) ? block_size : rem_size;
1977 rem_size -= obj_size;
1978 mn = get_mapnode(map, obj_index);
1980 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1985 mns[idx].offset = obj_offset;
1986 mns[idx].size = obj_size;
1991 /* do a first scan and issue as many copyups as we can.
1992 * then retry and wait when an object is not ready.
1993 * this could be done better, since now we wait also on the
1997 for (j = 0; j < 2 && !mio->err; j++) {
1998 for (i = 0; i < (idx+1); i++) {
2001 if (mn->flags & MF_OBJECT_NOT_READY){
2004 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2005 if (mn->flags & MF_OBJECT_DESTROYED){
2011 if (!(mn->flags & MF_OBJECT_EXIST)) {
2012 //calc new_target, copy up object
2013 if (copyup_object(peer, mn, pr) == NULL){
2014 XSEGLOG2(&lc, E, "Error in copy up object");
2022 XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
2028 wait_on_pr(pr, mio->copyups > 0);
2033 XSEGLOG2(&lc, E, "Mio->err");
2037 /* resize request to fit reply */
2038 char buf[XSEG_MAX_TARGETLEN];
2039 strncpy(buf, target, pr->req->targetlen);
2040 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
2042 XSEGLOG2(&lc, E, "Cannot resize request");
2045 target = xseg_get_target(peer->xseg, pr->req);
2046 strncpy(target, buf, pr->req->targetlen);
2048 /* structure reply */
2049 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2050 reply->cnt = nr_objs;
2051 for (i = 0; i < (idx+1); i++) {
2052 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
2053 reply->segs[i].targetlen = mns[i].mn->objectlen;
2054 reply->segs[i].offset = mns[i].offset;
2055 reply->segs[i].size = mns[i].size;
2058 for (i = 0; i < idx; i++) {
2059 put_mapnode(mns[i].mn);
2066 static int do_dropcache(struct peer_req *pr, struct map *map)
2068 struct map_node *mn;
2069 struct peerd *peer = pr->peer;
2070 struct mapperd *mapper = __get_mapperd(peer);
2072 XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
2073 map->flags |= MF_MAP_DROPPING_CACHE;
2074 for (i = 0; i < calc_map_obj(map); i++) {
2075 mn = get_mapnode(map, i);
2077 if (!(mn->flags & MF_OBJECT_DESTROYED)){
2078 //make sure all pending operations on all objects are completed
2079 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2080 mn->flags |= MF_OBJECT_DESTROYED;
2085 map->flags &= ~MF_MAP_DROPPING_CACHE;
2086 map->flags |= MF_MAP_DESTROYED;
2087 remove_map(mapper, map);
2088 XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
2089 put_map(map); // put map here to destroy it (matches m->ref = 1 on map create)
2093 static int do_info(struct peer_req *pr, struct map *map)
2095 struct peerd *peer = pr->peer;
2096 struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
2097 xinfo->size = map->size;
2102 static int do_open(struct peer_req *pr, struct map *map)
2104 if (map->flags & MF_MAP_EXCLUSIVE){
2112 static int do_close(struct peer_req *pr, struct map *map)
2114 if (map->flags & MF_MAP_EXCLUSIVE){
2115 /* do not drop cache if close failed and map not deleted */
2116 if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
2119 return do_dropcache(pr, map);
2122 static int do_snapshot(struct peer_req *pr, struct map *map)
2125 struct peerd *peer = pr->peer;
2126 struct mapper_io *mio = __get_mapper_io(pr);
2127 struct map_node *mn;
2128 struct xseg_request *req;
2130 if (!(map->flags & MF_MAP_EXCLUSIVE)){
2131 XSEGLOG2(&lc, E, "Map was not opened exclusively");
2134 XSEGLOG2(&lc, I, "Starting snapshot for map %s", map->volume);
2135 map->flags |= MF_MAP_SNAPSHOTTING;
2137 uint64_t nr_obj = calc_map_obj(map);
2138 mio->cb = snapshot_cb;
2139 mio->snap_pending = 0;
2141 for (i = 0; i < nr_obj; i++){
2143 /* throttle pending snapshots
2144 * this should be nr_ops of the blocker, but since we don't know
2145 * that, we assume based on our own nr_ops
2147 wait_on_pr(pr, mio->snap_pending >= peer->nr_ops);
2149 mn = get_mapnode(map, i);
2153 if (!(mn->flags & MF_OBJECT_EXIST)){
2157 // make sure all pending operations on all objects are completed
2158 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2160 /* TODO will this ever happen?? */
2161 if (mn->flags & MF_OBJECT_DESTROYED){
2166 req = __snapshot_object(pr, mn);
2172 mio->snap_pending++;
2173 /* do not put_mapnode here. cb does that */
2176 wait_on_pr(pr, mio->snap_pending > 0);
2182 /* calculate name of snapshot */
2183 struct map tmp_map = *map;
2184 unsigned char sha[SHA256_DIGEST_SIZE];
2185 unsigned char *buf = malloc(block_size);
2186 char newvolumename[MAX_VOLUME_LEN];
2187 uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
2189 uint64_t max_objidx = calc_map_obj(map);
2192 for (i = 0; i < max_objidx; i++) {
2193 mn = find_object(map, i);
2195 XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
2196 (unsigned long long) i, map->volume);
2199 v0_object_to_map(mn, buf+pos);
2200 pos += v0_objectsize_in_map;
2202 SHA256(buf, pos, sha);
2203 hexlify(sha, newvolumename);
2204 strncpy(tmp_map.volume, newvolumename, newvolumenamelen);
2205 tmp_map.volumelen = newvolumenamelen;
2207 tmp_map.version = 0; // set volume version to pithos image
2209 /* write the map of the Snapshot */
2210 r = write_map(pr, &tmp_map);
2214 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
2215 sizeof(struct xseg_reply_snapshot));
2217 XSEGLOG2(&lc, E, "Cannot resize request");
2220 struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *)
2221 xseg_get_data(peer->xseg, pr->req);
2222 strncpy(xreply->target, newvolumename, newvolumenamelen);
2223 xreply->targetlen = newvolumenamelen;
2224 map->flags &= ~MF_MAP_SNAPSHOTTING;
2225 XSEGLOG2(&lc, I, "Snapshot for map %s completed", map->volume);
2229 map->flags &= ~MF_MAP_SNAPSHOTTING;
2230 XSEGLOG2(&lc, E, "Snapshot for map %s failed", map->volume);
2235 static int do_destroy(struct peer_req *pr, struct map *map)
2238 struct peerd *peer = pr->peer;
2239 struct mapper_io *mio = __get_mapper_io(pr);
2240 struct map_node *mn;
2241 struct xseg_request *req;
2243 if (!(map->flags & MF_MAP_EXCLUSIVE))
2246 XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
2247 req = __delete_map(pr, map);
2250 wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
2251 if (req->state & XS_FAILED){
2252 xseg_put_request(peer->xseg, req, pr->portno);
2253 map->flags &= ~MF_MAP_DELETING;
2256 xseg_put_request(peer->xseg, req, pr->portno);
2258 uint64_t nr_obj = calc_map_obj(map);
2259 mio->cb = deletion_cb;
2260 mio->del_pending = 0;
2262 for (i = 0; i < nr_obj; i++){
2264 /* throttle pending deletions
2265 * this should be nr_ops of the blocker, but since we don't know
2266 * that, we assume based on our own nr_ops
2268 wait_on_pr(pr, mio->del_pending >= peer->nr_ops);
2270 mn = get_mapnode(map, i);
2273 if (mn->flags & MF_OBJECT_DESTROYED){
2277 if (!(mn->flags & MF_OBJECT_EXIST)){
2278 mn->flags |= MF_OBJECT_DESTROYED;
2283 // make sure all pending operations on all objects are completed
2284 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2286 req = __delete_object(pr, mn);
2293 /* do not put_mapnode here. cb does that */
2296 wait_on_pr(pr, mio->del_pending > 0);
2299 map->flags &= ~MF_MAP_DELETING;
2300 map->flags |= MF_MAP_DELETED;
2301 XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
2302 return do_close(pr, map);
2305 static int do_mapr(struct peer_req *pr, struct map *map)
2307 struct peerd *peer = pr->peer;
2308 int r = req2objs(pr, map, 0);
2310 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
2312 (unsigned long long) pr->req->offset,
2313 (unsigned long long) (pr->req->offset + pr->req->size));
2316 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
2318 (unsigned long long) pr->req->offset,
2319 (unsigned long long) (pr->req->offset + pr->req->size));
2320 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2321 (unsigned long long) pr->req->offset,
2322 (unsigned long long) pr->req->size);
2323 char buf[XSEG_MAX_TARGETLEN+1];
2324 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2326 for (i = 0; i < reply->cnt; i++) {
2327 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2328 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2329 buf[reply->segs[i].targetlen] = 0;
2330 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2331 (unsigned long long) reply->segs[i].offset,
2332 (unsigned long long) reply->segs[i].size);
2337 static int do_mapw(struct peer_req *pr, struct map *map)
2339 struct peerd *peer = pr->peer;
2340 int r = req2objs(pr, map, 1);
2342 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
2344 (unsigned long long) pr->req->offset,
2345 (unsigned long long) (pr->req->offset + pr->req->size));
2348 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
2350 (unsigned long long) pr->req->offset,
2351 (unsigned long long) (pr->req->offset + pr->req->size));
2352 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2353 (unsigned long long) pr->req->offset,
2354 (unsigned long long) pr->req->size);
2355 char buf[XSEG_MAX_TARGETLEN+1];
2356 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2358 for (i = 0; i < reply->cnt; i++) {
2359 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2360 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2361 buf[reply->segs[i].targetlen] = 0;
2362 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2363 (unsigned long long) reply->segs[i].offset,
2364 (unsigned long long) reply->segs[i].size);
2369 //here map is the parent map
2370 static int do_clone(struct peer_req *pr, struct map *map)
2373 struct peerd *peer = pr->peer;
2374 struct mapperd *mapper = __get_mapperd(peer);
2375 char *target = xseg_get_target(peer->xseg, pr->req);
2376 struct map *clonemap;
2377 struct xseg_request_clone *xclone =
2378 (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2380 XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
2382 clonemap = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2386 /* open map to get exclusive access to map */
2387 r = open_map(pr, clonemap, 0);
2389 XSEGLOG2(&lc, E, "Cannot open map %s", clonemap->volume);
2390 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2393 r = load_map(pr, clonemap);
2395 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2399 if (xclone->size == -1)
2400 clonemap->size = map->size;
2402 clonemap->size = xclone->size;
2403 if (clonemap->size < map->size){
2404 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
2405 "\n\t for requested clone %s",
2406 (unsigned long long) xclone->size,
2407 (unsigned long long) map->size, clonemap->volume);
2410 if (clonemap->size > MAX_VOLUME_SIZE) {
2411 XSEGLOG2(&lc, E, "Requested size %llu > max volume size %llu"
2412 "\n\t for volume %s",
2413 clonemap->size, MAX_VOLUME_SIZE, clonemap->volume);
2417 //alloc and init map_nodes
2418 //unsigned long c = clonemap->size/block_size + 1;
2419 unsigned long c = calc_map_obj(clonemap);
2420 struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
2425 //for (i = 0; i < clonemap->size/block_size + 1; i++) {
2426 for (i = 0; i < c; i++) {
2427 struct map_node *mn = get_mapnode(map, i);
2429 strncpy(map_nodes[i].object, mn->object, mn->objectlen);
2430 map_nodes[i].objectlen = mn->objectlen;
2433 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2434 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2436 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2437 map_nodes[i].flags = 0;
2438 map_nodes[i].objectidx = i;
2439 map_nodes[i].map = clonemap;
2440 map_nodes[i].ref = 1;
2441 map_nodes[i].waiters = 0;
2442 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2443 r = insert_object(clonemap, &map_nodes[i]);
2445 XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
2450 r = write_map(pr, clonemap);
2452 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
2455 do_close(pr, clonemap);
2459 do_close(pr, clonemap);
2463 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
2466 if (flags & MF_EXCLUSIVE){
2467 r = open_map(pr, map, flags);
2469 if (flags & MF_FORCE){
2476 r = load_map(pr, map);
2477 if (r < 0 && opened){
2483 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
2487 struct peerd *peer = pr->peer;
2488 struct mapperd *mapper = __get_mapperd(peer);
2489 struct map *map = find_map_len(mapper, name, namelen, flags);
2491 if (flags & MF_LOAD){
2492 map = create_map(mapper, name, namelen, flags);
2495 r = open_load_map(pr, map, flags);
2497 do_dropcache(pr, map);
2503 } else if (map->flags & MF_MAP_DESTROYED){
2511 static int map_action(int (action)(struct peer_req *pr, struct map *map),
2512 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
2514 //struct peerd *peer = pr->peer;
2517 map = get_map(pr, name, namelen, flags);
2520 if (map->flags & MF_MAP_NOT_READY){
2521 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
2525 int r = action(pr, map);
2526 //always drop cache if map not read exclusively
2527 if (!(map->flags & MF_MAP_EXCLUSIVE))
2528 do_dropcache(pr, map);
2534 void * handle_info(struct peer_req *pr)
2536 struct peerd *peer = pr->peer;
2537 char *target = xseg_get_target(peer->xseg, pr->req);
2538 int r = map_action(do_info, pr, target, pr->req->targetlen,
2548 void * handle_clone(struct peer_req *pr)
2551 struct peerd *peer = pr->peer;
2552 struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2558 if (xclone->targetlen){
2559 /* if snap was defined */
2560 //support clone only from pithos
2561 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
2564 /* else try to create a new volume */
2565 XSEGLOG2(&lc, I, "Creating volume");
2567 XSEGLOG2(&lc, E, "Cannot create volume. Size not specified");
2571 if (xclone->size > MAX_VOLUME_SIZE) {
2572 XSEGLOG2(&lc, E, "Requested size %llu > max volume "
2573 "size %llu", xclone->size, MAX_VOLUME_SIZE);
2579 char *target = xseg_get_target(peer->xseg, pr->req);
2581 //create a new empty map of size
2582 map = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2587 /* open map to get exclusive access to map */
2588 r = open_map(pr, map, 0);
2590 XSEGLOG2(&lc, E, "Cannot open map %s", map->volume);
2591 XSEGLOG2(&lc, E, "Target volume %s exists", map->volume);
2592 do_dropcache(pr, map);
2596 r = load_map(pr, map);
2598 XSEGLOG2(&lc, E, "Map exists %s", map->volume);
2603 map->size = xclone->size;
2604 //populate_map with zero objects;
2605 uint64_t nr_objs = xclone->size / block_size;
2606 if (xclone->size % block_size)
2609 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
2617 for (i = 0; i < nr_objs; i++) {
2618 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2619 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2620 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2621 map_nodes[i].flags = 0;
2622 map_nodes[i].objectidx = i;
2623 map_nodes[i].map = map;
2624 map_nodes[i].ref = 1;
2625 map_nodes[i].waiters = 0;
2626 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2627 r = insert_object(map, &map_nodes[i]);
2634 r = write_map(pr, map);
2636 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
2640 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
2642 do_close(pr, map); //drop cache here for consistency
2653 void * handle_mapr(struct peer_req *pr)
2655 struct peerd *peer = pr->peer;
2656 char *target = xseg_get_target(peer->xseg, pr->req);
2657 int r = map_action(do_mapr, pr, target, pr->req->targetlen,
2658 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2667 void * handle_mapw(struct peer_req *pr)
2669 struct peerd *peer = pr->peer;
2670 char *target = xseg_get_target(peer->xseg, pr->req);
2671 int r = map_action(do_mapw, pr, target, pr->req->targetlen,
2672 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2677 XSEGLOG2(&lc, D, "Ta: %d", ta);
2682 void * handle_destroy(struct peer_req *pr)
2684 struct peerd *peer = pr->peer;
2685 char *target = xseg_get_target(peer->xseg, pr->req);
2686 /* request EXCLUSIVE access, but do not force it.
2687 * check if succeeded on do_destroy
2689 int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2690 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2699 void * handle_open(struct peer_req *pr)
2701 struct peerd *peer = pr->peer;
2702 char *target = xseg_get_target(peer->xseg, pr->req);
2703 //here we do not want to load
2704 int r = map_action(do_open, pr, target, pr->req->targetlen,
2705 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2714 void * handle_close(struct peer_req *pr)
2716 struct peerd *peer = pr->peer;
2717 char *target = xseg_get_target(peer->xseg, pr->req);
2718 //here we do not want to load
2719 int r = map_action(do_close, pr, target, pr->req->targetlen,
2720 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2729 void * handle_snapshot(struct peer_req *pr)
2731 struct peerd *peer = pr->peer;
2732 char *target = xseg_get_target(peer->xseg, pr->req);
2733 /* request EXCLUSIVE access, but do not force it.
2734 * check if succeeded on do_destroy
2736 int r = map_action(do_snapshot, pr, target, pr->req->targetlen,
2737 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2746 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
2747 struct xseg_request *req)
2749 //struct mapperd *mapper = __get_mapperd(peer);
2750 struct mapper_io *mio = __get_mapper_io(pr);
2751 void *(*action)(struct peer_req *) = NULL;
2753 mio->state = ACCEPTED;
2756 switch (pr->req->op) {
2757 /* primary xseg operations of mapper */
2758 case X_CLONE: action = handle_clone; break;
2759 case X_MAPR: action = handle_mapr; break;
2760 case X_MAPW: action = handle_mapw; break;
2761 case X_SNAPSHOT: action = handle_snapshot; break;
2762 case X_INFO: action = handle_info; break;
2763 case X_DELETE: action = handle_destroy; break;
2764 case X_OPEN: action = handle_open; break;
2765 case X_CLOSE: action = handle_close; break;
2766 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2771 st_thread_create(action, pr, 0, 0);
2777 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2778 enum dispatch_reason reason)
2780 struct mapperd *mapper = __get_mapperd(peer);
2782 struct mapper_io *mio = __get_mapper_io(pr);
2786 if (reason == dispatch_accept)
2787 dispatch_accepted(peer, pr, req);
2798 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2802 //FIXME error checks
2803 struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2804 peer->priv = mapperd;
2806 mapper->hashmaps = xhash_new(3, STRING);
2808 for (i = 0; i < peer->nr_ops; i++) {
2809 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2810 mio->copyups_nodes = xhash_new(3, INTEGER);
2814 peer->peer_reqs[i].priv = mio;
2817 mapper->bportno = -1;
2818 mapper->mbportno = -1;
2819 BEGIN_READ_ARGS(argc, argv);
2820 READ_ARG_ULONG("-bp", mapper->bportno);
2821 READ_ARG_ULONG("-mbp", mapper->mbportno);
2823 if (mapper->bportno == -1){
2824 XSEGLOG2(&lc, E, "Portno for blocker must be provided");
2828 if (mapper->mbportno == -1){
2829 XSEGLOG2(&lc, E, "Portno for mblocker must be provided");
2834 const struct sched_param param = { .sched_priority = 99 };
2835 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
2836 /* FIXME maybe place it in peer
2837 * should be done for each port (sportno to eportno)
2839 xseg_set_max_requests(peer->xseg, peer->portno_start, 5000);
2840 xseg_set_freequeue_size(peer->xseg, peer->portno_start, 3000, 0);
2848 /* FIXME this should not be here */
2849 int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
2851 struct xseg *xseg = peer->xseg;
2852 xport portno_start = peer->portno_start;
2853 xport portno_end = peer->portno_end;
2854 struct peer_req *pr;
2857 struct xseg_request *received;
2858 xseg_prepare_wait(xseg, portno_start);
2860 XSEGLOG2(&lc, D, "Attempting to check for reply");
2864 for (i = portno_start; i <= portno_end; i++) {
2865 received = xseg_receive(xseg, i, 0);
2868 r = xseg_get_req_data(xseg, received, (void **) &pr);
2869 if (r < 0 || !pr || received != expected_req){
2870 XSEGLOG2(&lc, W, "Received request with no pr data\n");
2871 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
2873 XSEGLOG2(&lc, W, "Could not respond stale request");
2874 xseg_put_request(xseg, received, portno_start);
2877 xseg_signal(xseg, p);
2880 xseg_cancel_wait(xseg, portno_start);
2886 xseg_wait_signal(xseg, 1000000UL);
2891 void custom_peer_finalize(struct peerd *peer)
2893 struct mapperd *mapper = __get_mapperd(peer);
2894 struct peer_req *pr = alloc_peer_req(peer);
2896 XSEGLOG2(&lc, E, "Cannot get peer request");
2900 struct xseg_request *req;
2903 xhash_iter_init(mapper->hashmaps, &it);
2904 while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
2905 map = (struct map *)val;
2906 if (!(map->flags & MF_MAP_EXCLUSIVE))
2908 req = __close_map(pr, map);
2911 wait_reply(peer, req);
2912 if (!(req->state & XS_SERVED))
2913 XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
2914 map->flags &= ~MF_MAP_CLOSING;
2915 xseg_put_request(peer->xseg, req, pr->portno);
2922 void print_obj(struct map_node *mn)
2924 fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n",
2925 (unsigned long long) mn->objectidx, mn->object,
2926 (unsigned int) mn->objectlen,
2927 (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2930 void print_map(struct map *m)
2932 uint64_t nr_objs = m->size/block_size;
2933 if (m->size % block_size)
2935 fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n",
2936 m->volume, m->volumelen,
2937 (unsigned long long) m->size,
2938 (unsigned long long) nr_objs,
2941 struct map_node *mn;
2942 if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2944 for (i = 0; i < nr_objs; i++) {
2945 mn = find_object(m, i);
2947 printf("object idx [%llu] not found!\n", (unsigned long long) i);
2955 void test_map(struct peerd *peer)
2958 //struct sha256_ctx sha256ctx;
2959 unsigned char buf[SHA256_DIGEST_SIZE];
2960 char buf_new[XSEG_MAX_TARGETLEN + 20];
2961 struct map *m = malloc(sizeof(struct map));
2962 strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2963 m->volume[XSEG_MAX_TARGETLEN] = 0;
2964 strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2965 buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2966 m->volumelen = XSEG_MAX_TARGETLEN;
2967 m->size = 100*block_size;
2968 m->objects = xhash_new(3, INTEGER);
2969 struct map_node *map_node = calloc(100, sizeof(struct map_node));
2970 for (i = 0; i < 100; i++) {
2971 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2972 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2974 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2975 sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2977 map_node[i].objectidx = i;
2978 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2979 map_node[i].flags = MF_OBJECT_EXIST;
2980 ret = insert_object(m, &map_node[i]);
2983 char *data = malloc(block_size);
2984 mapheader_to_map(m, data);
2985 uint64_t pos = mapheader_size;
2987 for (i = 0; i < 100; i++) {
2988 map_node = find_object(m, i);
2990 printf("no object node %d \n", i);
2993 object_to_map(data+pos, map_node);
2994 pos += objectsize_in_map;
2998 struct map *m2 = malloc(sizeof(struct map));
2999 strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
3000 m->volume[XSEG_MAX_TARGETLEN] = 0;
3001 m->volumelen = XSEG_MAX_TARGETLEN;
3003 m2->objects = xhash_new(3, INTEGER);
3004 ret = read_map(peer, m2, data);
3007 int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
3009 while (sum < block_size) {
3010 r = write(fd, data + sum, block_size -sum);
3013 printf("write error\n");
3019 map_node = find_object(m, 0);