From 40e56a42a5fa167c9424f784fe89c74addd145f1 Mon Sep 17 00:00:00 2001 From: Filippos Giannakos Date: Wed, 19 Sep 2012 19:43:04 +0300 Subject: [PATCH] fix various bugs in mt-mapperd. also add copy to filed and magic in xheap header --- xseg/peers/user/filed.c | 48 ++++ xseg/peers/user/mpeer.c | 5 +- xseg/peers/user/mt-mapperd.c | 567 +++++++++++++++++++++++------------------- xseg/peers/user/xseg-tool.c | 5 + xseg/xseg/xseg.c | 7 +- xseg/xtypes/xheap.c | 4 + xseg/xtypes/xheap.h | 1 + 7 files changed, 376 insertions(+), 261 deletions(-) diff --git a/xseg/peers/user/filed.c b/xseg/peers/user/filed.c index 61c483b..0be531b 100644 --- a/xseg/peers/user/filed.c +++ b/xseg/peers/user/filed.c @@ -12,6 +12,9 @@ #include #include #include +#include +#include + #define MAX_PATH_SIZE 255 #define MAX_FILENAME_SIZE 255 @@ -423,6 +426,49 @@ static void handle_info(struct store *store, struct io *io) complete(store, io); } +static void handle_copy(struct store *store, struct io *io) +{ + struct xseg_request *req = io->req; + struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(store->xseg, req); + struct stat st; + int n, src, dst; + char *target = xseg_get_target(store->xseg, req); + + dst = dir_open(store, io, target, req->targetlen, 1); + if (dst < 0) { + fprintf(stderr, "fail in dst\n"); + fail(store, io); + return; + } + + src = openat(store->dirfd, xcopy->target, O_RDWR); + if (src < 0) { + fprintf(stderr, "fail in src\n"); + fail(store, io); + return; + } + + fstat(src, &st); + n = sendfile(dst, src, 0, st.st_size); + if (n != st.st_size) { + fprintf(stderr, "fail in copy\n"); + fail(store, io); + goto out; + } + + if (n < 0) { + fprintf(stderr, "fail in cp\n"); + fail(store, io); + goto out; + } + + complete(store, io); + +out: + close(src); +} + + static void dispatch(struct store *store, struct io *io) { if (verbose) @@ -434,6 +480,8 @@ static void dispatch(struct store *store, struct io *io) handle_read_write(store, io); break; case X_INFO: handle_info(store, io); break; + case X_COPY: + handle_copy(store, io); break; case X_SYNC: default: handle_unknown(store, io); diff --git a/xseg/peers/user/mpeer.c b/xseg/peers/user/mpeer.c index 09d768e..dbad826 100644 --- a/xseg/peers/user/mpeer.c +++ b/xseg/peers/user/mpeer.c @@ -442,7 +442,6 @@ malloc_fail: peer->peer_reqs[i].priv = NULL; } peer->interactive_func = NULL; - peerd_start_threads(peer); return peer; } @@ -508,6 +507,8 @@ int main(int argc, const char *argv[]) //TODO err check peer = peerd_init(nr_ops, spec, portno, nr_threads, defer_portno); r = custom_peer_init(peer, argc, argv); -// peerd_start_threads(peer); + if (r < 0) + return -1; + peerd_start_threads(peer); return peerd_loop(peer); } diff --git a/xseg/peers/user/mt-mapperd.c b/xseg/peers/user/mt-mapperd.c index 387f62a..b85f987 100644 --- a/xseg/peers/user/mt-mapperd.c +++ b/xseg/peers/user/mt-mapperd.c @@ -12,57 +12,57 @@ #include #include -#define PENDING 1; +#define MF_PENDING 1 - -#define XSEG_MAX_TARGET_LEN (SHA256_DIGEST_SIZE << 1) // hex representation of sha256 value takes up double the sha256 size +/* hex representation of sha256 value takes up double the sha256 size */ +#define XSEG_MAX_TARGET_LEN (SHA256_DIGEST_SIZE << 1) #define block_size (1<<20) -#define objectsize_in_map (1 + XSEG_MAX_TARGET_LEN) //(transparency byte + max object len) -#define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) //magic + volume size +#define objectsize_in_map (1 + XSEG_MAX_TARGET_LEN) /* transparency byte + max object len */ +#define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value + volume size */ #define MF_OBJECT_EXIST (1 << 0) #define MF_OBJECT_COPYING (1 << 1) char *magic_string = "This a magic string. Please hash me"; -unsigned char magic_sha256[SHA256_DIGEST_SIZE]; -char zero_block[SHA256_DIGEST_SIZE * 2 + 1]; +unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */ +char zero_block[SHA256_DIGEST_SIZE * 2 + 1]; /* hexlified sha256 hash value of a block full of zeros */ struct map_node { - struct xlock lock; uint32_t flags; - uint32_t objectlen; uint32_t objectidx; - char object[XSEG_MAX_TARGET_LEN + 1]; /* NULL terminated string */ - struct xq pending; /* pending peer_reqs on this object */ + uint32_t objectlen; + char object[XSEG_MAX_TARGET_LEN + 1]; /* NULL terminated string */ + struct xq pending; /* pending peer_reqs on this object */ }; -#define MF_MAP_LOADING (1 << 0) -#define MF_MAP_ABORT (1 << 1) +#define MF_MAP_LOADING (1 << 0) +#define MF_MAP_DESTROYED (1 << 1) struct map { uint32_t flags; uint64_t size; uint32_t volumelen; char volume[XSEG_MAX_TARGET_LEN + 1]; /* NULL terminated string */ - struct xlock lock; - xhash_t *objects; // obj_index --> map_node - struct xq pending; /* pending peer_reqs on this map */ + xhash_t *objects; /* obj_index --> map_node */ + struct xq pending; /* pending peer_reqs on this map */ }; struct mapperd { xport bportno; - struct xlock maps_lock; xhash_t *hashmaps; // hash_function(target) --> struct map }; struct mapper_io { - struct xlock lock; - volatile uint32_t copyups; - xhash_t *copyups_nodes; - int err; + volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */ + xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/ + int err; /* error flag */ }; +/* + * Helper functions + */ + static inline struct mapperd * __get_mapperd(struct peerd *peer) { return (struct mapperd *) peer->priv; @@ -73,7 +73,35 @@ static inline struct mapper_io * __get_mapper_io(struct peer_req *pr) return (struct mapper_io *) pr->priv; } -static struct map * __find_map(struct mapperd *mapper, char *target, uint32_t targetlen) +static inline uint64_t calc_map_obj(struct map *map) +{ + uint64_t nr_objs = map->size / block_size; + if (map->size % block_size) + nr_objs++; + return nr_objs; +} + +static uint32_t calc_nr_obj(struct xseg_request *req) +{ + unsigned int r = 1; + uint64_t rem_size = req->size; + uint64_t obj_offset = req->offset & (block_size -1); //modulo + uint64_t obj_size = (rem_size > block_size) ? block_size - obj_offset : rem_size; + rem_size -= obj_size; + while (rem_size > 0) { + obj_size = (rem_size - block_size > 0) ? block_size : rem_size; + rem_size -= obj_size; + r++; + } + + return r; +} + +/* + * Maps handling functions + */ + +static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen) { int r; struct map *m = NULL; @@ -84,28 +112,16 @@ static struct map * __find_map(struct mapperd *mapper, char *target, uint32_t ta r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m); if (r < 0) return NULL; - print_map(m); return m; } -static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen) -{ - struct map *m = NULL; - - xlock_acquire(&mapper->maps_lock, 1); - m = __find_map(mapper, target, targetlen); - xlock_release(&mapper->maps_lock); - - return m; -} static int insert_map(struct mapperd *mapper, struct map *map) { int r = -1; - xlock_acquire(&mapper->maps_lock, 1); - if (__find_map(mapper, map->volume, map->volumelen)){ - printf("map found in insert map\n"); + if (find_map(mapper, map->volume, map->volumelen)){ + //printf("map found in insert map\n"); goto out; } @@ -119,7 +135,6 @@ static int insert_map(struct mapperd *mapper, struct map *map) r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map); } out: - xlock_release(&mapper->maps_lock); return r; } @@ -127,7 +142,6 @@ static int remove_map(struct mapperd *mapper, struct map *map) { int r = -1; - xlock_acquire(&mapper->maps_lock, 1); //assert no pending pr on map r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume); @@ -140,7 +154,6 @@ static int remove_map(struct mapperd *mapper, struct map *map) r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume); } out: - xlock_release(&mapper->maps_lock); return r; } @@ -151,9 +164,8 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint3 xport p; struct xseg_request *req; struct mapperd *mapper = __get_mapperd(peer); - //struct peerd *peer = __get_peerd(mapper); void *dummy; - printf("Loading map\n"); + //printf("Loading map\n"); struct map *m = find_map(mapper, target, targetlen); if (!m) { @@ -170,22 +182,18 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint3 goto out_map; } m->objects = xhash_new(3, INTEGER); //FIXME err_check; + if (!m->objects) + goto out_q; __xq_append_tail(&m->pending, (xqindex) pr); - xlock_release(&m->lock); } else { goto map_exists; } r = insert_map(mapper, m); - if (r < 0) { // someone beat us (or resize error) - xq_free(&m->pending); - free(m); - m = find_map(mapper, target, targetlen); - if (!m) - goto out_err; - goto map_exists; - } - printf("Loading map: preparing req\n"); + if (r < 0) + goto out_hash; + + //printf("Loading map: preparing req\n"); req = xseg_get_request(peer->xseg, peer->portno, mapper->bportno, X_ALLOC); if (!req) @@ -210,7 +218,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint3 goto out_unset; r = xseg_signal(peer->xseg, p); - printf("Loading map: request issued\n"); + //printf("Loading map: request issued\n"); return 0; out_unset: @@ -219,15 +227,15 @@ out_put: xseg_put_request(peer->xseg, req, peer->portno); out_fail: - //remove m from maps - //fail pending reqs; //FIXME possible race if someone got map but not appended pr to pending yet - // probably refcount with get/put will help remove_map(mapper, m); xqindex idx; - while((idx = xq_pop_head(&m->pending, 1)) != Noneidx) { + while((idx = __xq_pop_head(&m->pending)) != Noneidx) { fail(peer, (struct peer_req *) idx); } -//out_q: + +out_hash: + xhash_free(m->objects); +out_q: xq_free(&m->pending); out_map: free(m); @@ -235,48 +243,44 @@ out_err: return -1; map_exists: - xlock_acquire(&m->lock, 1); + //assert map loading when this is reached if (m->flags & MF_MAP_LOADING) { __xq_append_tail(&m->pending, (xqindex) pr); - xlock_release(&m->lock); } else { - xlock_release(&m->lock); dispatch(peer, pr, pr->req); } return 0; } -#define MAP_LOADING 1 -static int find_or_load_map(struct peer* peer, struct peer_req *pr, +static int find_or_load_map(struct peerd *peer, struct peer_req *pr, char *target, uint32_t targetlen, struct map **m) { struct mapperd *mapper = __get_mapperd(peer); int r; - printf("find map or load\n"); + //printf("find map or load\n"); *m = find_map(mapper, target, targetlen); if (*m) { - printf("map found\n"); - xlock_acquire(&((*m)->lock), 1); + //printf("map found\n"); if ((*m)->flags & MF_MAP_LOADING) { __xq_append_tail(&(*m)->pending, (xqindex) pr); - xlock_release(&((*m)->lock)); - printf("Map loading\n"); - return MAP_LOADING; + //printf("Map loading\n"); + return MF_PENDING; } else { - printf("Map returned\n"); - xlock_release(&((*m)->lock)); + //printf("Map returned\n"); return 0; } } r = load_map(peer, pr, target, targetlen); if (r < 0) return -1; //error - return MAP_LOADING; + return MF_PENDING; } - +/* + * Object handling functions + */ struct map_node *find_object(struct map *map, uint64_t obj_index) { @@ -301,6 +305,34 @@ static int insert_object(struct map *map, struct map_node *mn) return r; } + +/* + * map read/write functions + */ +static inline void pithosmap_to_object(struct map_node *mn, char *buf) +{ + int i; + //hexlify sha256 value + for (i = 0; i < SHA256_DIGEST_SIZE; i++) { + sprintf(mn->object, "%02x", buf[i]); + } + + mn->object[XSEG_MAX_TARGET_LEN] = 0; + mn->objectlen = strlen(mn->object); + mn->flags = 0; +} + +static inline void map_to_object(struct map_node *mn, char *buf) +{ + char c = buf[0]; + mn->flags = 0; + if (c) + mn->flags |= MF_OBJECT_EXIST; + memcpy(mn->object, buf+1, XSEG_MAX_TARGET_LEN); + mn->object[XSEG_MAX_TARGET_LEN] = 0; + mn->objectlen = strlen(mn->object); +} + static inline void object_to_map(char* buf, struct map_node *mn) { buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0; @@ -308,14 +340,20 @@ static inline void object_to_map(char* buf, struct map_node *mn) memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGET_LEN - mn->objectlen); //zero out the rest of the buffer } -static int object_write(struct peerd *peer, struct peer_req *pr, - struct map *map, uint64_t objidx) +static inline void mapheader_to_map(struct map *m, char *buf) +{ + uint64_t pos = 0; + memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE); + pos += SHA256_DIGEST_SIZE; + memcpy(buf + pos, &m->size, sizeof(m->size)); + pos += sizeof(m->size); +} + + +static int object_write(struct peerd *peer, struct peer_req *pr, struct map_node *mn) { void *dummy; struct mapperd *mapper = __get_mapperd(peer); - struct map_node *mn = find_object(map, objidx); - if (!mn) - goto out_err; struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, mapper->bportno, X_ALLOC); if (!req) @@ -326,7 +364,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr, char *target = xseg_get_target(peer->xseg, req); strncpy(target, mn->object, mn->objectlen); req->size = objectsize_in_map; - req->offset = mapheader_size + objidx * objectsize_in_map; + req->offset = mapheader_size + mn->objectidx * objectsize_in_map; req->op = X_WRITE; char *data = xseg_get_data(peer->xseg, req); object_to_map(data, mn); @@ -339,7 +377,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr, goto out_unset; r = xseg_signal(peer->xseg, p); - return PENDING; + return MF_PENDING; out_unset: xseg_get_req_data(peer->xseg, req, &dummy); @@ -349,21 +387,12 @@ out_err: return -1; } -static inline void mapheader_to_map(struct map *m, char *buf) -{ - uint64_t pos = 0; - memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE); - pos += SHA256_DIGEST_SIZE; - memcpy(buf + pos, &m->size, sizeof(m->size)); - pos += sizeof(m->size); -} - static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map) { void *dummy; struct mapperd *mapper = __get_mapperd(peer); struct map_node *mn; - uint64_t i, pos, max_objidx = map->size / block_size; + uint64_t i, pos, max_objidx = calc_map_obj(map); struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, mapper->bportno, X_ALLOC); if (!req) @@ -392,7 +421,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map) if (p == NoPort) goto out_unset; r = xseg_signal(peer->xseg, p); - return PENDING; + return MF_PENDING; out_unset: xseg_get_req_data(peer->xseg, req, &dummy); @@ -402,6 +431,65 @@ out_err: return -1; } +static int read_map (struct peerd *peer, struct map *map, char *buf) +{ + char nulls[SHA256_DIGEST_SIZE]; + memset(nulls, 0, SHA256_DIGEST_SIZE); + + int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE); + if (r) { + //read error; + return -1; + } + //type 1, our type, type 0 pithos map + int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE); + uint64_t pos; + uint64_t i, nr_objs; + struct map_node *map_node; + if (type) { + pos = SHA256_DIGEST_SIZE; + map->size = *(uint64_t *) (buf + pos); + pos += sizeof(uint64_t); + nr_objs = map->size / block_size; + if (map->size % block_size) + nr_objs++; + map_node = calloc(nr_objs, sizeof(struct map_node)); + if (!map_node) + return -1; + + for (i = 0; i < nr_objs; i++) { + map_node[i].objectidx = i; + xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check + map_to_object(&map_node[i], buf + pos); + pos += objectsize_in_map; + r = insert_object(map, &map_node[i]); //FIXME error check + } + } else { + pos = 0; + uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE; + map_node = calloc(max_nr_objs, sizeof(struct map_node)); + if (!map_node) + return -1; + for (i = 0; i < max_nr_objs; i++) { + if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE)) + break; + map_node[i].objectidx = i; + xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check + pithosmap_to_object(&map_node[i], buf + pos); + pos += SHA256_DIGEST_SIZE; + r = insert_object(map, &map_node[i]); //FIXME error check + } + map->size = i * block_size; + } + return 0; + + //FIXME cleanup on error +} + +/* + * copy up functions + */ + static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn) { int r = 0; @@ -440,7 +528,6 @@ static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_re return mn; } -// mn->lock held, static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr) { struct mapperd *mapper = __get_mapperd(peer); @@ -450,11 +537,12 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re xport p; struct sha256_ctx sha256ctx; uint32_t newtargetlen; - char new_target[XSEG_MAX_TARGET_LEN]; + char new_target[XSEG_MAX_TARGET_LEN + 1]; unsigned char buf[SHA256_DIGEST_SIZE]; //assert sha256_digest_size(32) <= MAXTARGETLEN - char new_object[XSEG_MAX_TARGET_LEN + 20]; //20 is an arbitrary padding + char new_object[XSEG_MAX_TARGET_LEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx strncpy(new_object, mn->object, mn->objectlen); sprintf(new_object + mn->objectlen, "%u", mn->objectidx); //sprintf adds null termination + new_object[XSEG_MAX_TARGET_LEN + 19] = 0; /* calculate new object name */ @@ -463,7 +551,7 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re sha256_finish_ctx(&sha256ctx, buf); for (i = 0; i < SHA256_DIGEST_SIZE; ++i) sprintf (new_target + 2*i, "%02x", buf[i]); - newtargetlen = SHA256_DIGEST_SIZE; + newtargetlen = SHA256_DIGEST_SIZE * 2; struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, @@ -476,12 +564,11 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re goto out_put; char *target = xseg_get_target(peer->xseg, req); - strncpy(target, mn->object, mn->objectlen); - target[mn->objectlen] = 0; + strncpy(target, new_target, newtargetlen); struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req); - strncpy(xcopy->target, new_target, newtargetlen); - xcopy->target[newtargetlen] = 0; + strncpy(xcopy->target, mn->object, mn->objectlen); + xcopy->target[mn->objectlen] = 0; req->offset = 0; req->size = block_size; @@ -509,77 +596,9 @@ out_put: } -static inline void pithosmap_to_object(struct map_node *mn, char *buf) -{ - int i; - //hexlify sha256 value - for (i = 0; i < SHA256_DIGEST_SIZE; i++) { - sprintf(mn->object, "%02x", buf[i]); - } - - mn->object[XSEG_MAX_TARGET_LEN] = 0; - mn->objectlen = strlen(mn->object); - mn->flags = 0; -} - -static inline void map_to_object(struct map_node *mn, char *buf) -{ - char c = buf[0]; - mn->flags = 0; - if (c) - mn->flags |= MF_OBJECT_EXIST; - memcpy(mn->object, buf+1, XSEG_MAX_TARGET_LEN); - mn->object[XSEG_MAX_TARGET_LEN] = 0; - mn->objectlen = strlen(mn->object); -} - - -static int read_map (struct peerd *peer, struct map *map, char *buf) -{ - //type 1, our type, type 0 pithos map - int r, type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE); - uint64_t pos; - uint64_t i, nr_objs; - struct map_node *map_node; - if (type) { - pos = SHA256_DIGEST_SIZE; - map->size = *(uint64_t *) (buf + pos); - pos += sizeof(uint64_t); - nr_objs = map->size / block_size; - if (map->size % block_size) - nr_objs++; - map_node = calloc(nr_objs, sizeof(struct map_node)); - if (!map_node) - return -1; - - for (i = 0; i < nr_objs; i++) { - map_node[i].objectidx = i; - xlock_release(&map_node[i].lock); - xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check - map_to_object(&map_node[i], buf + pos); - pos += objectsize_in_map; - r = insert_object(map, &map_node[i]); //FIXME error check - } - } else { - pos = 0; - uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE; - map_node = calloc(max_nr_objs, sizeof(struct map_node)); - if (!map_node) - return -1; - for (i = 0; i < max_nr_objs; i++) { - if (!memcmp(buf+pos, "0000000000000000", SHA256_DIGEST_SIZE)) - break; - map_node[i].objectidx = i; - xlock_release(&map_node[i].lock); - xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check - pithosmap_to_object(&map_node[i], buf + pos); - pos += SHA256_DIGEST_SIZE; - r = insert_object(map, &map_node[i]); //FIXME error check - } - map->size = i * block_size; - } - return 0; -} +/* + * request handling functions + */ static int handle_mapread(struct peerd *peer, struct peer_req *pr, struct xseg_request *req) @@ -600,38 +619,30 @@ static int handle_mapread(struct peerd *peer, struct peer_req *pr, char *data = xseg_get_data(peer->xseg, req); r = read_map(peer, map, data); if (r < 0) - goto out_err; + goto out_fail; xseg_put_request(peer->xseg, req, peer->portno); - xlock_acquire(&map->lock, 1); map->flags &= ~MF_MAP_LOADING; while((idx = __xq_pop_head(&map->pending)) != Noneidx){ struct peer_req *preq = (struct peer_req *) idx; - xlock_release(&map->lock); dispatch(peer, preq, preq->req); - xlock_acquire(&map->lock, 1); } - xlock_release(&map->lock); return 0; out_fail: - xlock_acquire(&map->lock, 1); + xseg_put_request(peer->xseg, req, peer->portno); map->flags &= ~MF_MAP_LOADING; - map->flags |= MF_MAP_ABORT; while((idx = __xq_pop_head(&map->pending)) != Noneidx){ struct peer_req *preq = (struct peer_req *) idx; - xlock_release(&map->lock); fail(peer, preq); - xlock_acquire(&map->lock, 1); } - xlock_release(&map->lock); remove_map(mapper, map); free(map); return 0; out_err: xseg_put_request(peer->xseg, req, peer->portno); - goto out_fail; + return -1; } static int handle_clone(struct peerd *peer, struct peer_req *pr, @@ -640,20 +651,22 @@ static int handle_clone(struct peerd *peer, struct peer_req *pr, struct mapperd *mapper = __get_mapperd(peer); struct mapper_io *mio = __get_mapper_io(pr); (void) mio; - printf("handle clone 1\n"); struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req); if (!xclone) { goto out_err; } - printf("handle clone 2\n"); struct map *map; int r = find_or_load_map(peer, pr, xclone->target, strlen(xclone->target), &map); if (r < 0) goto out_err; - else if (r == MAP_LOADING) + else if (r == MF_PENDING) return 0; + + if (map->flags & MF_MAP_DESTROYED) { + fail(peer, pr); + return 0; + } - printf("handle clone 3\n"); //alloc and init struct map struct map *clonemap = malloc(sizeof(struct map)); if (!clonemap) { @@ -666,7 +679,6 @@ static int handle_clone(struct peerd *peer, struct peer_req *pr, xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops); if (!qidx) goto out_err_objhash; - xlock_release(&clonemap->lock); clonemap->size = xclone->size; clonemap->flags = 0; char *target = xseg_get_target(peer->xseg, pr->req); @@ -682,24 +694,27 @@ static int handle_clone(struct peerd *peer, struct peer_req *pr, } int i; for (i = 0; i < xclone->size/block_size + 1; i++) { - strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //FIXME copy object name from father - map_nodes[i].objectlen = strlen(zero_block); + struct map_node *mn = find_object(map, i); + if (mn) { + strncpy(map_nodes[i].object, mn->object, mn->objectlen); + map_nodes[i].objectlen = mn->objectlen; + } else { + strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); + map_nodes[i].objectlen = strlen(zero_block); + } map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate map_nodes[i].flags = 0; map_nodes[i].objectidx = i; - xlock_release(&map_nodes[i].lock); - r = insert_object(map, &map_nodes[i]); + xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops); + r = insert_object(clonemap, &map_nodes[i]); if (r < 0) goto out_free_all; } //insert map - printf("handle clone 4\n"); r = insert_map(mapper, clonemap); if ( r < 0) { - printf("handle clone 6\n"); goto out_free_all; } - printf("handle clone 5\n"); complete(peer, pr); return 0; @@ -718,21 +733,6 @@ out_err: return -1; } -static uint32_t calc_nr_obj(struct xseg_request *req) -{ - unsigned int r = 1; - uint64_t rem_size = req->size; - uint64_t obj_offset = req->offset & (block_size -1); //modulo - uint64_t obj_size = block_size - obj_offset; - while (rem_size > 0) { - obj_size = (rem_size - block_size > 0) ? block_size : rem_size; - rem_size -= obj_size; - r++; - } - - return r; -} - static int req2objs(struct peerd *peer, struct peer_req *pr, struct map *map, int write) { @@ -746,6 +746,7 @@ static int req2objs(struct peerd *peer, struct peer_req *pr, strncpy(buf, target, pr->req->targetlen); int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size); if (r < 0) { + printf("couldn't resize req\n"); return -1; } target = xseg_get_target(peer->xseg, pr->req); @@ -759,26 +760,27 @@ static int req2objs(struct peerd *peer, struct peer_req *pr, uint64_t rem_size = pr->req->size; uint64_t obj_index = pr->req->offset / block_size; uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo - uint64_t obj_size = block_size - obj_offset; + uint64_t obj_size = (rem_size > block_size) ? block_size - obj_offset : rem_size; struct map_node * mn = find_object(map, obj_index); if (!mn) { + printf("coudn't find obj_index\n"); goto out_err; } - xlock_acquire(&mn->lock, 1); - if (mn->flags & MF_OBJECT_COPYING) + if (write && mn->flags & MF_OBJECT_COPYING) goto out_object_copying; if (write && !(mn->flags & MF_OBJECT_EXIST)) { //calc new_target, copy up object r = copyup_object(peer, mn, pr); - if (r < 0) + if (r < 0) { + printf("err_copy\n"); goto out_err_copy; + } mn->flags |= MF_OBJECT_COPYING; goto out_object_copying; } strncpy(reply->segs[idx].target, mn->object, XSEG_MAX_TARGET_LEN); // or strlen(mn->target ?); reply->segs[idx].target[mn->objectlen] = 0; - xlock_release(&mn->lock); reply->segs[idx].offset = obj_offset; reply->segs[idx].size = obj_size; rem_size -= obj_size; @@ -790,22 +792,23 @@ static int req2objs(struct peerd *peer, struct peer_req *pr, rem_size -= obj_size; mn = find_object(map, obj_index); if (!mn) { + printf("coudn't find obj_index\n"); goto out_err; } - xlock_acquire(&mn->lock, 1); - if (mn->flags & MF_OBJECT_COPYING) + if (write && mn->flags & MF_OBJECT_COPYING) goto out_object_copying; if (write && !(mn->flags & MF_OBJECT_EXIST)) { //calc new_target, copy up object r = copyup_object(peer, mn, pr); - if (r < 0) + if (r < 0) { + printf("err_copy\n"); goto out_err_copy; + } mn->flags |= MF_OBJECT_COPYING; goto out_object_copying; } strncpy(reply->segs[idx].target, mn->object, XSEG_MAX_TARGET_LEN); // or strlen(mn->target ?); reply->segs[idx].target[mn->objectlen] = 0; - xlock_release(&mn->lock); reply->segs[idx].offset = obj_offset; reply->segs[idx].size = obj_size; } @@ -813,12 +816,12 @@ static int req2objs(struct peerd *peer, struct peer_req *pr, return 0; out_object_copying: - __xq_append_tail(&mn->pending, (xqindex) pr); - xlock_release(&mn->lock); - return PENDING; + //printf("r2o mn: %lx\n", mn); + if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx) + printf("couldn't append pr to tail\n"); + return MF_PENDING; out_err_copy: - xlock_release(&mn->lock); out_err: return -1; } @@ -837,13 +840,19 @@ static int handle_mapr(struct peerd *peer, struct peer_req *pr, fail(peer, pr); return -1; } - else if (r == MAP_LOADING) + else if (r == MF_PENDING) return 0; + if (map->flags & MF_MAP_DESTROYED) { + fail(peer, pr); + return 0; + } + //get_object r = req2objs(peer, pr, map, 0); - if (r < 0) + if (r < 0){ fail(peer, pr); + } else if (r == 0) complete(peer, pr); @@ -859,47 +868,39 @@ static int handle_copyup(struct peerd *peer, struct peer_req *pr, (void) mapper; struct mapper_io *mio = __get_mapper_io(pr); int r = 0; - xlock_acquire(&mio->lock, 1); + //printf("handle copyup reply\n"); if (req->state & XS_FAILED && !(req->state & XS_SERVED)) { + //printf("copy up failed\n"); mio->err = 1; r = 1; } struct map_node *mn = __get_copyup_node(mio, req); - if (!mn) + if (!mn){ + //printf("copy up mn not found\n"); mio->err =1; //BUG + } else { - xlock_acquire(&mn->lock, 1); + //printf("mn: %lx\n", mn); mn->flags &= ~MF_OBJECT_COPYING; - mn->flags |= MF_OBJECT_EXIST; if (!r) { - struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req); - strncpy(mn->object, xcopy->target, strlen(xcopy->target)); + mn->flags |= MF_OBJECT_EXIST; + char *target = xseg_get_target(peer->xseg, req); + strncpy(mn->object, target, req->targetlen); } - xlock_release(&mn->lock); } __set_copyup_node(mio, req, NULL); xseg_put_request(peer->xseg, req, peer->portno); mio->copyups--; - if (!mio->copyups) { - if (mio->err) - fail(peer, pr); - else - complete(peer, pr); - } - xlock_release(&mio->lock); - if (mn) { //handle peer_requests waiting on copy up xqindex idx; - xlock_acquire(&mn->lock, 1); - while ((idx = __xq_pop_head(&mn->pending) != Noneidx)){ - xlock_release(&mn->lock); + //printf("foo\n"); + while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){ + //printf("dispatching pending\n"); struct peer_req * preq = (struct peer_req *) idx; dispatch(peer, preq, preq->req); - xlock_acquire(&mn->lock, 1); } - xlock_release(&mn->lock); } return 0; @@ -922,13 +923,27 @@ static int handle_mapw(struct peerd *peer, struct peer_req *pr, fail(peer, pr); return -1; } - else if (r == MAP_LOADING) + else if (r == MF_PENDING) return 0; + + if (map->flags & MF_MAP_DESTROYED) { + printf("map MF_MAP_DESTROYED req %lx\n", pr->req); + fail(peer, pr); + return 0; + } + if (mio->err) { + //printf("mapw failed\n"); + fail(peer, pr); + return 0; + } + //printf("handle mapw\n"); - + mio->err = 0; r = req2objs(peer, pr, map, 1); - if (r < 0) + if (r < 0){ + printf("req2obj returned r < 0 for req %lx\n", pr->req); fail(peer, pr); + } if (r == 0) complete(peer, pr); //else copyup pending, wait for pr restart @@ -950,28 +965,50 @@ static int handle_info(struct peerd *peer, struct peer_req *pr, struct mapper_io *mio = __get_mapper_io(pr); (void) mio; char *target = xseg_get_target(peer->xseg, pr->req); - if (!target) - return -1; - printf("Handle info\n"); + if (!target) { + fail(peer, pr); + return 0; + } + //printf("Handle info\n"); struct map *map; int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map); if (r < 0) { fail(peer, pr); return -1; } - else if (r == MAP_LOADING) + else if (r == MF_PENDING) + return 0; + if (map->flags & MF_MAP_DESTROYED) { + fail(peer, pr); return 0; - else { - struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req); - xinfo->size = map->size; - complete(peer, pr); } + + struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req); + xinfo->size = map->size; + complete(peer, pr); + return 0; } static int handle_destroy(struct peerd *peer, struct peer_req *pr, struct xseg_request *req) { + /* + struct map *map; + int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map); + if (r < 0) { + fail(peer, pr); + return -1; + } + else if (r == MF_PENDING) + return 0; + map->flags |= MF_MAP_DESTROYED; + */ + //delete map block + //do not delete all objects + //remove_map(mapper, map); + //free(map, map_nodes, all allocated resources); + //complete(peer, pr); fail(peer, pr); return 0; } @@ -1008,10 +1045,13 @@ int custom_peer_init(struct peerd *peer, int argc, const char *argv[]) unsigned char buf[SHA256_DIGEST_SIZE]; char *zero; struct sha256_ctx sha256ctx; + /* calculate out magic sha hash value */ sha256_init_ctx(&sha256ctx); sha256_process_bytes(magic_string, strlen(magic_string), &sha256ctx); sha256_finish_ctx(&sha256ctx, magic_sha256); + /* calculate zero block */ + //FIXME check hash value zero = malloc(block_size); memset(zero, 0, block_size); sha256_init_ctx(&sha256ctx); @@ -1021,14 +1061,13 @@ int custom_peer_init(struct peerd *peer, int argc, const char *argv[]) sprintf(zero_block + 2*i, "%02x", buf[i]); printf("%s \n", zero_block); + //FIXME error checks struct mapperd *mapper = malloc(sizeof(struct mapperd)); - xlock_release(&mapper->maps_lock); mapper->hashmaps = xhash_new(3, STRING); peer->priv = mapper; for (i = 0; i < peer->nr_ops; i++) { struct mapper_io *mio = malloc(sizeof(struct mapper_io)); - xlock_release(&mio->lock); mio->copyups_nodes = xhash_new(3, INTEGER); mio->copyups = 0; mio->err = 0; @@ -1041,9 +1080,19 @@ int custom_peer_init(struct peerd *peer, int argc, const char *argv[]) i += 1; continue; } + /* enforce only one thread */ + if (!strcmp(argv[i], "-t") && (i+1) < argc){ + int t = atoi(argv[i+1]); + if (t != 1) { + printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n"); + return -1; + } + i += 1; + continue; + } } - test_map(peer); + //test_map(peer); return 0; } @@ -1063,12 +1112,14 @@ void print_map(struct map *m) m->volume, m->volumelen, m->size, nr_objs); uint64_t i; struct map_node *mn; - if (nr_objs > 1000000) + if (nr_objs > 1000000) //FIXME to protect against invalid volume size return; for (i = 0; i < nr_objs; i++) { mn = find_object(m, i); - if (!mn) + if (!mn){ + printf("object idx [%llu] not found!\n", i); continue; + } print_obj(mn); } } diff --git a/xseg/peers/user/xseg-tool.c b/xseg/peers/user/xseg-tool.c index 0275306..9a1ff94 100644 --- a/xseg/peers/user/xseg-tool.c +++ b/xseg/peers/user/xseg-tool.c @@ -314,6 +314,7 @@ int cmd_write(char *target, uint64_t offset) fprintf(stderr, "Cannot submit\n"); return -1; } + xseg_signal(xseg, p); return 0; } @@ -985,6 +986,9 @@ void handle_reply(struct xseg_request *req) break; case X_WRITE: + fprintf(stdout, "wrote: "); + fwrite(req_data, 1, req->datalen, stdout); + break; case X_SYNC: case X_DELETE: case X_TRUNCATE: @@ -1009,6 +1013,7 @@ int cmd_wait(uint32_t nr) { struct xseg_request *req; long ret; + init_local_signal(); for (;;) { req = xseg_receive(xseg, srcport); diff --git a/xseg/xseg/xseg.c b/xseg/xseg/xseg.c index e4102c5..014f003 100644 --- a/xseg/xseg/xseg.c +++ b/xseg/xseg/xseg.c @@ -1162,8 +1162,13 @@ int xseg_prep_request ( struct xseg* xseg, struct xseg_request *req, int xseg_resize_request (struct xseg *xseg, struct xseg_request *req, uint32_t new_targetlen, uint64_t new_datalen) { - if (req->bufferlen >= new_datalen + new_targetlen) + if (req->bufferlen >= new_datalen + new_targetlen) { + req->data = req->buffer; + req->target = req->buffer + req->bufferlen - new_targetlen; + req->datalen = new_datalen; + req->targetlen = new_targetlen; return 0; + } if (req->buffer){ void *ptr = XPTR_TAKE(req->buffer, xseg->segment); diff --git a/xseg/xtypes/xheap.c b/xseg/xtypes/xheap.c index 399fd72..a747910 100644 --- a/xseg/xtypes/xheap.c +++ b/xseg/xtypes/xheap.c @@ -103,6 +103,7 @@ alloc: // printf("after heap->cur: %llu\n", heap->cur); h = (struct xheap_header *) (((unsigned long) mem) + heap->cur); h->size = bytes - sizeof(struct xheap_header); + h->magic = 0xdeadbeaf; XPTRSET(&h->heap, heap); heap->cur += bytes; @@ -145,6 +146,9 @@ void xheap_free(void *ptr) { struct xheap_header *h = __get_header(ptr); struct xheap *heap = XPTR(&h->heap); + if (h->magic != 0xdeadbeaf) { + XSEGLOG("for ptr: %lx, magic %lx != 0xdeadbeaf", ptr, h->magic); + } void *mem = XPTR(&heap->mem); uint64_t size = xheap_get_chunk_size(ptr); xptr *free_list = (xptr *) mem; diff --git a/xseg/xtypes/xheap.h b/xseg/xtypes/xheap.h index c4490ab..dd2139d 100644 --- a/xseg/xtypes/xheap.h +++ b/xseg/xtypes/xheap.h @@ -5,6 +5,7 @@ #include struct xheap_header { + uint64_t magic; XPTR_TYPE(struct xheap) heap; uint64_t size; }; -- 1.7.10.4