fix various bugs in mt-mapperd.
authorFilippos Giannakos <philipgian@grnet.gr>
Wed, 19 Sep 2012 16:43:04 +0000 (19:43 +0300)
committerFilippos Giannakos <philipgian@grnet.gr>
Wed, 19 Sep 2012 16:43:04 +0000 (19:43 +0300)
also add copy to filed and magic in xheap header

xseg/peers/user/filed.c
xseg/peers/user/mpeer.c
xseg/peers/user/mt-mapperd.c
xseg/peers/user/xseg-tool.c
xseg/xseg/xseg.c
xseg/xtypes/xheap.c
xseg/xtypes/xheap.h

index 61c483b..0be531b 100644 (file)
@@ -12,6 +12,9 @@
 #include <limits.h>
 #include <xseg/xseg.h>
 #include <pthread.h>
+#include <xseg/protocol.h>
+#include <sys/sendfile.h>
+
 
 #define MAX_PATH_SIZE 255
 #define MAX_FILENAME_SIZE 255
@@ -423,6 +426,49 @@ static void handle_info(struct store *store, struct io *io)
        complete(store, io);
 }
 
+static void handle_copy(struct store *store, struct io *io)
+{
+        struct xseg_request *req = io->req;
+        struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(store->xseg, req);
+        struct stat st;
+        int n, src, dst;
+       char *target = xseg_get_target(store->xseg, req);
+
+        dst = dir_open(store, io, target, req->targetlen, 1);
+        if (dst < 0) {
+                fprintf(stderr, "fail in dst\n");
+                fail(store, io);
+                return;
+        }
+
+       src = openat(store->dirfd, xcopy->target, O_RDWR);      
+        if (src < 0) {
+                fprintf(stderr, "fail in src\n");
+                fail(store, io);
+                return;
+        }
+
+        fstat(src, &st);
+        n = sendfile(dst, src, 0, st.st_size);
+        if (n != st.st_size) {
+                fprintf(stderr, "fail in copy\n");
+                fail(store, io);
+                goto out;
+        }
+
+        if (n < 0) {
+                fprintf(stderr, "fail in cp\n");
+                fail(store, io);
+                goto out;
+        }
+
+        complete(store, io);
+
+out:
+        close(src);
+}
+
+
 static void dispatch(struct store *store, struct io *io)
 {
        if (verbose)
@@ -434,6 +480,8 @@ static void dispatch(struct store *store, struct io *io)
                handle_read_write(store, io); break;
        case X_INFO:
                handle_info(store, io); break;
+       case X_COPY:
+               handle_copy(store, io); break;
        case X_SYNC:
        default:
                handle_unknown(store, io);
index 09d768e..dbad826 100644 (file)
@@ -442,7 +442,6 @@ malloc_fail:
                peer->peer_reqs[i].priv = NULL;
        }
        peer->interactive_func = NULL;
-       peerd_start_threads(peer);
        return peer;
 }
 
@@ -508,6 +507,8 @@ int main(int argc, const char *argv[])
        //TODO err check
        peer = peerd_init(nr_ops, spec, portno, nr_threads, defer_portno);
        r = custom_peer_init(peer, argc, argv);
-//     peerd_start_threads(peer);
+       if (r < 0)
+               return -1;
+       peerd_start_threads(peer);
        return peerd_loop(peer);
 }
index 387f62a..b85f987 100644 (file)
 #include <sys/stat.h>
 #include <fcntl.h>
 
-#define PENDING 1;
+#define MF_PENDING 1
 
-
-#define XSEG_MAX_TARGET_LEN (SHA256_DIGEST_SIZE << 1) // hex representation of sha256 value takes up double the sha256 size
+/* hex representation of sha256 value takes up double the sha256 size */
+#define XSEG_MAX_TARGET_LEN (SHA256_DIGEST_SIZE << 1)
 
 #define block_size (1<<20)
-#define objectsize_in_map (1 + XSEG_MAX_TARGET_LEN) //(transparency byte + max object len)
-#define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) //magic + volume size 
+#define objectsize_in_map (1 + XSEG_MAX_TARGET_LEN) /* transparency byte + max object len */
+#define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value  + volume size */
 
 #define MF_OBJECT_EXIST                (1 << 0)
 #define MF_OBJECT_COPYING      (1 << 1)
 
 char *magic_string = "This a magic string. Please hash me";
-unsigned char magic_sha256[SHA256_DIGEST_SIZE];
-char zero_block[SHA256_DIGEST_SIZE * 2 + 1]; 
+unsigned char magic_sha256[SHA256_DIGEST_SIZE];        /* sha256 hash value of magic string */
+char zero_block[SHA256_DIGEST_SIZE * 2 + 1];   /* hexlified sha256 hash value of a block full of zeros */
 
 struct map_node {
-       struct xlock lock;
        uint32_t flags;
-       uint32_t objectlen;
        uint32_t objectidx;
-       char object[XSEG_MAX_TARGET_LEN + 1]; /* NULL terminated string */
-       struct xq pending; /* pending peer_reqs on this object */
+       uint32_t objectlen;
+       char object[XSEG_MAX_TARGET_LEN + 1];   /* NULL terminated string */
+       struct xq pending;                      /* pending peer_reqs on this object */
 };
 
-#define MF_MAP_LOADING (1 << 0)
-#define MF_MAP_ABORT (1 << 1)
+#define MF_MAP_LOADING         (1 << 0)
+#define MF_MAP_DESTROYED       (1 << 1)
 
 struct map {
        uint32_t flags;
        uint64_t size;
        uint32_t volumelen;
        char volume[XSEG_MAX_TARGET_LEN + 1]; /* NULL terminated string */
-       struct xlock lock; 
-       xhash_t *objects; // obj_index --> map_node
-       struct xq pending; /* pending peer_reqs on this map */
+       xhash_t *objects;       /* obj_index --> map_node */
+       struct xq pending;      /* pending peer_reqs on this map */
 };
 
 struct mapperd {
        xport bportno;
-       struct xlock maps_lock;
        xhash_t *hashmaps; // hash_function(target) --> struct map
 };
 
 struct mapper_io {
-       struct xlock lock;
-       volatile uint32_t copyups;
-       xhash_t *copyups_nodes;
-       int err;
+       volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
+       xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
+       int err;                        /* error flag */
 };
 
+/*
+ * Helper functions
+ */
+
 static inline struct mapperd * __get_mapperd(struct peerd *peer)
 {
        return (struct mapperd *) peer->priv;
@@ -73,7 +73,35 @@ static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
        return (struct mapper_io *) pr->priv;
 }
 
-static struct map * __find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
+static inline uint64_t calc_map_obj(struct map *map)
+{
+       uint64_t nr_objs = map->size / block_size;
+       if (map->size % block_size)
+               nr_objs++;
+       return nr_objs;
+}
+
+static uint32_t calc_nr_obj(struct xseg_request *req)
+{
+       unsigned int r = 1;
+       uint64_t rem_size = req->size;
+       uint64_t obj_offset = req->offset & (block_size -1); //modulo
+       uint64_t obj_size =  (rem_size > block_size) ? block_size - obj_offset : rem_size;
+       rem_size -= obj_size;
+       while (rem_size > 0) {
+               obj_size = (rem_size - block_size > 0) ? block_size : rem_size;
+               rem_size -= obj_size;
+               r++;
+       }
+
+       return r;
+}
+
+/*
+ * Maps handling functions
+ */
+
+static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
 {
        int r;
        struct map *m = NULL;
@@ -84,28 +112,16 @@ static struct map * __find_map(struct mapperd *mapper, char *target, uint32_t ta
        r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
        if (r < 0)
                return NULL;
-       print_map(m);
        return m;
 }
 
-static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
-{
-       struct map *m = NULL;
-
-       xlock_acquire(&mapper->maps_lock, 1);
-       m = __find_map(mapper, target, targetlen);
-       xlock_release(&mapper->maps_lock);
-
-       return m;
-}
 
 static int insert_map(struct mapperd *mapper, struct map *map)
 {
        int r = -1;
        
-       xlock_acquire(&mapper->maps_lock, 1);
-       if (__find_map(mapper, map->volume, map->volumelen)){
-               printf("map found in insert map\n");
+       if (find_map(mapper, map->volume, map->volumelen)){
+               //printf("map found in insert map\n");
                goto out;
        }
        
@@ -119,7 +135,6 @@ static int insert_map(struct mapperd *mapper, struct map *map)
                r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
        }
 out:
-       xlock_release(&mapper->maps_lock);
        return r;
 }
 
@@ -127,7 +142,6 @@ static int remove_map(struct mapperd *mapper, struct map *map)
 {
        int r = -1;
        
-       xlock_acquire(&mapper->maps_lock, 1);
        //assert no pending pr on map
        
        r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
@@ -140,7 +154,6 @@ static int remove_map(struct mapperd *mapper, struct map *map)
                r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
        }
 out:
-       xlock_release(&mapper->maps_lock);
        return r;
 }
 
@@ -151,9 +164,8 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint3
        xport p;
        struct xseg_request *req;
        struct mapperd *mapper = __get_mapperd(peer);
-       //struct peerd *peer = __get_peerd(mapper); 
        void *dummy;
-       printf("Loading map\n");
+       //printf("Loading map\n");
 
        struct map *m = find_map(mapper, target, targetlen);
        if (!m) {
@@ -170,22 +182,18 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint3
                        goto out_map;
                }
                m->objects = xhash_new(3, INTEGER); //FIXME err_check;
+               if (!m->objects)
+                       goto out_q;
                __xq_append_tail(&m->pending, (xqindex) pr);
-               xlock_release(&m->lock);
        } else {
                goto map_exists;
        }
 
        r = insert_map(mapper, m);
-       if (r < 0) { // someone beat us (or resize error) 
-               xq_free(&m->pending);
-               free(m);
-               m = find_map(mapper, target, targetlen);
-               if (!m)
-                       goto out_err;
-               goto map_exists;
-       }
-       printf("Loading map: preparing req\n");
+       if (r < 0)  
+               goto out_hash;
+       
+       //printf("Loading map: preparing req\n");
 
        req = xseg_get_request(peer->xseg, peer->portno, mapper->bportno, X_ALLOC);
        if (!req)
@@ -210,7 +218,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint3
                goto out_unset;
        r = xseg_signal(peer->xseg, p);
        
-       printf("Loading map: request issued\n");
+       //printf("Loading map: request issued\n");
        return 0;
 
 out_unset:
@@ -219,15 +227,15 @@ out_put:
        xseg_put_request(peer->xseg, req, peer->portno);
 
 out_fail:
-       //remove m from maps
-       //fail pending reqs;    //FIXME possible race if someone got map but not appended pr to pending yet
-                               // probably refcount with get/put will help 
        remove_map(mapper, m);
        xqindex idx;
-       while((idx = xq_pop_head(&m->pending, 1)) != Noneidx) {
+       while((idx = __xq_pop_head(&m->pending)) != Noneidx) {
                fail(peer, (struct peer_req *) idx);
        }
-//out_q:
+
+out_hash:
+       xhash_free(m->objects);
+out_q:
        xq_free(&m->pending);
 out_map:
        free(m);
@@ -235,48 +243,44 @@ out_err:
        return -1;
 
 map_exists:
-       xlock_acquire(&m->lock, 1);
+       //assert map loading when this is reached
        if (m->flags & MF_MAP_LOADING) {
                __xq_append_tail(&m->pending, (xqindex) pr);
-               xlock_release(&m->lock);
        }
        else {
-               xlock_release(&m->lock);
                dispatch(peer, pr, pr->req);
        }
        return 0;
 }
 
 
-#define MAP_LOADING 1
-static int find_or_load_map(struct peer* peer, struct peer_req *pr, 
+static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
                                char *target, uint32_t targetlen, struct map **m)
 {
        struct mapperd *mapper = __get_mapperd(peer);
        int r;
-       printf("find map or load\n");
+       //printf("find map or load\n");
        *m = find_map(mapper, target, targetlen);
        if (*m) {
-               printf("map found\n");
-               xlock_acquire(&((*m)->lock), 1);
+               //printf("map found\n");
                if ((*m)->flags & MF_MAP_LOADING) {
                        __xq_append_tail(&(*m)->pending, (xqindex) pr);
-                       xlock_release(&((*m)->lock));
-                       printf("Map loading\n");
-                       return MAP_LOADING;
+                       //printf("Map loading\n");
+                       return MF_PENDING;
                } else {
-                       printf("Map returned\n");
-                       xlock_release(&((*m)->lock));
+                       //printf("Map returned\n");
                        return 0;
                }
        }
        r = load_map(peer, pr, target, targetlen);
        if (r < 0)
                return -1; //error
-       return MAP_LOADING;     
+       return MF_PENDING;      
 }
 
-
+/*
+ * Object handling functions
+ */
 
 struct map_node *find_object(struct map *map, uint64_t obj_index)
 {
@@ -301,6 +305,34 @@ static int insert_object(struct map *map, struct map_node *mn)
        return r;
 }
 
+
+/*
+ * map read/write functions 
+ */
+static inline void pithosmap_to_object(struct map_node *mn, char *buf)
+{
+       int i;
+       //hexlify sha256 value
+       for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
+               sprintf(mn->object, "%02x", buf[i]);
+       }
+
+       mn->object[XSEG_MAX_TARGET_LEN] = 0;
+       mn->objectlen = strlen(mn->object);
+       mn->flags = 0;
+}
+
+static inline void map_to_object(struct map_node *mn, char *buf)
+{
+       char c = buf[0];
+       mn->flags = 0;
+       if (c)
+               mn->flags |= MF_OBJECT_EXIST;
+       memcpy(mn->object, buf+1, XSEG_MAX_TARGET_LEN);
+       mn->object[XSEG_MAX_TARGET_LEN] = 0;
+       mn->objectlen = strlen(mn->object);
+}
+
 static inline void object_to_map(char* buf, struct map_node *mn)
 {
        buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
@@ -308,14 +340,20 @@ static inline void object_to_map(char* buf, struct map_node *mn)
        memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGET_LEN - mn->objectlen); //zero out the rest of the buffer
 }
 
-static int object_write(struct peerd *peer, struct peer_req *pr, 
-                               struct map *map, uint64_t objidx)
+static inline void mapheader_to_map(struct map *m, char *buf)
+{
+       uint64_t pos = 0;
+       memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
+       pos += SHA256_DIGEST_SIZE;
+       memcpy(buf + pos, &m->size, sizeof(m->size));
+       pos += sizeof(m->size);
+}
+
+
+static int object_write(struct peerd *peer, struct peer_req *pr, struct map_node *mn)
 {
        void *dummy;
        struct mapperd *mapper = __get_mapperd(peer);
-       struct map_node *mn = find_object(map, objidx);
-       if (!mn)
-               goto out_err;
        struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
                                                        mapper->bportno, X_ALLOC);
        if (!req)
@@ -326,7 +364,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
        char *target = xseg_get_target(peer->xseg, req);
        strncpy(target, mn->object, mn->objectlen);
        req->size = objectsize_in_map;
-       req->offset = mapheader_size + objidx * objectsize_in_map;
+       req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
        req->op = X_WRITE;
        char *data = xseg_get_data(peer->xseg, req);
        object_to_map(data, mn);
@@ -339,7 +377,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
                goto out_unset;
        r = xseg_signal(peer->xseg, p);
 
-       return PENDING;
+       return MF_PENDING;
 
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
@@ -349,21 +387,12 @@ out_err:
        return -1;
 }
 
-static inline void mapheader_to_map(struct map *m, char *buf)
-{
-       uint64_t pos = 0;
-       memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
-       pos += SHA256_DIGEST_SIZE;
-       memcpy(buf + pos, &m->size, sizeof(m->size));
-       pos += sizeof(m->size);
-}
-
 static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
 {
        void *dummy;
        struct mapperd *mapper = __get_mapperd(peer);
        struct map_node *mn;
-       uint64_t i, pos, max_objidx = map->size / block_size;
+       uint64_t i, pos, max_objidx = calc_map_obj(map);
        struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
                                                        mapper->bportno, X_ALLOC);
        if (!req)
@@ -392,7 +421,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
        if (p == NoPort)
                goto out_unset;
        r = xseg_signal(peer->xseg, p);
-       return PENDING;
+       return MF_PENDING;
 
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
@@ -402,6 +431,65 @@ out_err:
        return -1;
 }
 
+static int read_map (struct peerd *peer, struct map *map, char *buf)
+{
+       char nulls[SHA256_DIGEST_SIZE];
+       memset(nulls, 0, SHA256_DIGEST_SIZE);
+
+       int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
+       if (r) {
+               //read error;
+               return -1;
+       }
+       //type 1, our type, type 0 pithos map
+       int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
+       uint64_t pos;
+       uint64_t i, nr_objs;
+       struct map_node *map_node;
+       if (type) {
+               pos = SHA256_DIGEST_SIZE;
+               map->size = *(uint64_t *) (buf + pos);
+               pos += sizeof(uint64_t);
+               nr_objs = map->size / block_size;
+               if (map->size % block_size)
+                       nr_objs++;
+               map_node = calloc(nr_objs, sizeof(struct map_node));
+               if (!map_node)
+                       return -1;
+
+               for (i = 0; i < nr_objs; i++) {
+                       map_node[i].objectidx = i;
+                       xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
+                       map_to_object(&map_node[i], buf + pos);
+                       pos += objectsize_in_map;
+                       r = insert_object(map, &map_node[i]); //FIXME error check
+               }
+       } else {
+               pos = 0;
+               uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
+               map_node = calloc(max_nr_objs, sizeof(struct map_node));
+               if (!map_node)
+                       return -1;
+               for (i = 0; i < max_nr_objs; i++) {
+                       if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
+                               break;
+                       map_node[i].objectidx = i;
+                       xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
+                       pithosmap_to_object(&map_node[i], buf + pos);
+                       pos += SHA256_DIGEST_SIZE; 
+                       r = insert_object(map, &map_node[i]); //FIXME error check
+               }
+               map->size = i * block_size; 
+       }
+       return 0;
+
+       //FIXME cleanup on error
+}
+
+/*
+ * copy up functions
+ */
+
 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
 {
        int r = 0;
@@ -440,7 +528,6 @@ static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_re
        return mn;
 }
 
-// mn->lock held,
 static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
 {
        struct mapperd *mapper = __get_mapperd(peer);
@@ -450,11 +537,12 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re
        xport p;
        struct sha256_ctx sha256ctx;
        uint32_t newtargetlen;
-       char new_target[XSEG_MAX_TARGET_LEN]; 
+       char new_target[XSEG_MAX_TARGET_LEN + 1]; 
        unsigned char buf[SHA256_DIGEST_SIZE];  //assert sha256_digest_size(32) <= MAXTARGETLEN 
-       char new_object[XSEG_MAX_TARGET_LEN + 20]; //20 is an arbitrary padding
+       char new_object[XSEG_MAX_TARGET_LEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
        strncpy(new_object, mn->object, mn->objectlen);
        sprintf(new_object + mn->objectlen, "%u", mn->objectidx); //sprintf adds null termination
+       new_object[XSEG_MAX_TARGET_LEN + 19] = 0;
 
 
        /* calculate new object name */
@@ -463,7 +551,7 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re
        sha256_finish_ctx(&sha256ctx, buf);
        for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
                sprintf (new_target + 2*i, "%02x", buf[i]);
-       newtargetlen = SHA256_DIGEST_SIZE;
+       newtargetlen = SHA256_DIGEST_SIZE  * 2;
 
 
        struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
@@ -476,12 +564,11 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re
                goto out_put;
 
        char *target = xseg_get_target(peer->xseg, req);
-       strncpy(target, mn->object, mn->objectlen);
-       target[mn->objectlen] = 0;
+       strncpy(target, new_target, newtargetlen);
 
        struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
-       strncpy(xcopy->target, new_target, newtargetlen);
-       xcopy->target[newtargetlen] = 0;
+       strncpy(xcopy->target, mn->object, mn->objectlen);
+       xcopy->target[mn->objectlen] = 0;
 
        req->offset = 0;
        req->size = block_size;
@@ -509,77 +596,9 @@ out_put:
 
 }
 
-static inline void pithosmap_to_object(struct map_node *mn, char *buf)
-{
-       int i;
-       //hexlify sha256 value
-       for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
-               sprintf(mn->object, "%02x", buf[i]);
-       }
-
-       mn->object[XSEG_MAX_TARGET_LEN] = 0;
-       mn->objectlen = strlen(mn->object);
-       mn->flags = 0;
-}
-
-static inline void map_to_object(struct map_node *mn, char *buf)
-{
-       char c = buf[0];
-       mn->flags = 0;
-       if (c)
-               mn->flags |= MF_OBJECT_EXIST;
-       memcpy(mn->object, buf+1, XSEG_MAX_TARGET_LEN);
-       mn->object[XSEG_MAX_TARGET_LEN] = 0;
-       mn->objectlen = strlen(mn->object);
-}
-
-
-static int read_map (struct peerd *peer, struct map *map, char *buf)
-{
-       //type 1, our type, type 0 pithos map
-       int r, type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
-       uint64_t pos;
-       uint64_t i, nr_objs;
-       struct map_node *map_node;
-       if (type) {
-               pos = SHA256_DIGEST_SIZE;
-               map->size = *(uint64_t *) (buf + pos);
-               pos += sizeof(uint64_t);
-               nr_objs = map->size / block_size;
-               if (map->size % block_size)
-                       nr_objs++;
-               map_node = calloc(nr_objs, sizeof(struct map_node));
-               if (!map_node)
-                       return -1;
-
-               for (i = 0; i < nr_objs; i++) {
-                       map_node[i].objectidx = i;
-                       xlock_release(&map_node[i].lock);
-                       xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
-                       map_to_object(&map_node[i], buf + pos);
-                       pos += objectsize_in_map;
-                       r = insert_object(map, &map_node[i]); //FIXME error check
-               }
-       } else {
-               pos = 0;
-               uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
-               map_node = calloc(max_nr_objs, sizeof(struct map_node));
-               if (!map_node)
-                       return -1;
-               for (i = 0; i < max_nr_objs; i++) {
-                       if (!memcmp(buf+pos, "0000000000000000", SHA256_DIGEST_SIZE))
-                               break;
-                       map_node[i].objectidx = i;
-                       xlock_release(&map_node[i].lock);
-                       xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
-                       pithosmap_to_object(&map_node[i], buf + pos);
-                       pos += SHA256_DIGEST_SIZE; 
-                       r = insert_object(map, &map_node[i]); //FIXME error check
-               }
-               map->size = i * block_size; 
-       }
-       return 0;
-}
+/*
+ * request handling functions
+ */
 
 static int handle_mapread(struct peerd *peer, struct peer_req *pr, 
                                struct xseg_request *req)
@@ -600,38 +619,30 @@ static int handle_mapread(struct peerd *peer, struct peer_req *pr,
        char *data = xseg_get_data(peer->xseg, req);
        r = read_map(peer, map, data);
        if (r < 0)
-               goto out_err;
+               goto out_fail;
        
        xseg_put_request(peer->xseg, req, peer->portno);
-       xlock_acquire(&map->lock, 1);
        map->flags &= ~MF_MAP_LOADING;
        while((idx = __xq_pop_head(&map->pending)) != Noneidx){
                struct peer_req *preq = (struct peer_req *) idx;
-               xlock_release(&map->lock);
                dispatch(peer, preq, preq->req);
-               xlock_acquire(&map->lock, 1);
        }
-       xlock_release(&map->lock);
        return 0;
 
 out_fail:
-       xlock_acquire(&map->lock, 1);
+       xseg_put_request(peer->xseg, req, peer->portno);
        map->flags &= ~MF_MAP_LOADING;
-       map->flags |= MF_MAP_ABORT;
        while((idx = __xq_pop_head(&map->pending)) != Noneidx){
                struct peer_req *preq = (struct peer_req *) idx;
-               xlock_release(&map->lock);
                fail(peer, preq);
-               xlock_acquire(&map->lock, 1);
        }
-       xlock_release(&map->lock);
        remove_map(mapper, map);
        free(map);
        return 0;
 
 out_err:
        xseg_put_request(peer->xseg, req, peer->portno);
-       goto out_fail;
+       return -1;
 }
 
 static int handle_clone(struct peerd *peer, struct peer_req *pr, 
@@ -640,20 +651,22 @@ static int handle_clone(struct peerd *peer, struct peer_req *pr,
        struct mapperd *mapper = __get_mapperd(peer);
        struct mapper_io *mio = __get_mapper_io(pr);
        (void) mio;
-       printf("handle clone 1\n");
        struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
        if (!xclone) {
                goto out_err;
        }
-       printf("handle clone 2\n");
        struct map *map;
        int r = find_or_load_map(peer, pr, xclone->target, strlen(xclone->target), &map);
        if (r < 0)
                goto out_err;
-       else if (r == MAP_LOADING)
+       else if (r == MF_PENDING)
                return 0;
+       
+       if (map->flags & MF_MAP_DESTROYED) {
+               fail(peer, pr);
+               return 0;
+       }
 
-       printf("handle clone 3\n");
        //alloc and init struct map
        struct map *clonemap = malloc(sizeof(struct map));
        if (!clonemap) {
@@ -666,7 +679,6 @@ static int handle_clone(struct peerd *peer, struct peer_req *pr,
        xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops);
        if (!qidx)
                goto out_err_objhash;
-       xlock_release(&clonemap->lock);
        clonemap->size = xclone->size;
        clonemap->flags = 0;
        char *target = xseg_get_target(peer->xseg, pr->req);
@@ -682,24 +694,27 @@ static int handle_clone(struct peerd *peer, struct peer_req *pr,
        }
        int i;
        for (i = 0; i < xclone->size/block_size + 1; i++) {
-               strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //FIXME copy object name from father
-               map_nodes[i].objectlen = strlen(zero_block);
+               struct map_node *mn = find_object(map, i);
+               if (mn) {
+                       strncpy(map_nodes[i].object, mn->object, mn->objectlen);
+                       map_nodes[i].objectlen = mn->objectlen;
+               } else {
+                       strncpy(map_nodes[i].object, zero_block, strlen(zero_block));
+                       map_nodes[i].objectlen = strlen(zero_block);
+               }
                map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
                map_nodes[i].flags = 0;
                map_nodes[i].objectidx = i;
-               xlock_release(&map_nodes[i].lock);
-               r = insert_object(map, &map_nodes[i]);
+               xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops);
+               r = insert_object(clonemap, &map_nodes[i]);
                if (r < 0)
                        goto out_free_all;
        }
        //insert map
-       printf("handle clone 4\n");
        r = insert_map(mapper, clonemap);
        if ( r < 0) {
-               printf("handle clone 6\n");
                goto out_free_all;
        }
-       printf("handle clone 5\n");
 
        complete(peer, pr);
        return 0;
@@ -718,21 +733,6 @@ out_err:
        return -1;
 }
 
-static uint32_t calc_nr_obj(struct xseg_request *req)
-{
-       unsigned int r = 1;
-       uint64_t rem_size = req->size;
-       uint64_t obj_offset = req->offset & (block_size -1); //modulo
-       uint64_t obj_size =  block_size - obj_offset;
-       while (rem_size > 0) {
-               obj_size = (rem_size - block_size > 0) ? block_size : rem_size;
-               rem_size -= obj_size;
-               r++;
-       }
-
-       return r;
-}
-
 static int req2objs(struct peerd *peer, struct peer_req *pr, 
                                        struct map *map, int write)
 {
@@ -746,6 +746,7 @@ static int req2objs(struct peerd *peer, struct peer_req *pr,
        strncpy(buf, target, pr->req->targetlen);
        int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
        if (r < 0) {
+               printf("couldn't resize req\n");
                return -1;
        }
        target = xseg_get_target(peer->xseg, pr->req);
@@ -759,26 +760,27 @@ static int req2objs(struct peerd *peer, struct peer_req *pr,
        uint64_t rem_size = pr->req->size;
        uint64_t obj_index = pr->req->offset / block_size;
        uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo
-       uint64_t obj_size =  block_size - obj_offset;
+       uint64_t obj_size =  (rem_size > block_size) ? block_size - obj_offset : rem_size;
        struct map_node * mn = find_object(map, obj_index);
        if (!mn) {
+               printf("coudn't find obj_index\n");
                goto out_err;
        }
-       xlock_acquire(&mn->lock, 1);
-       if (mn->flags & MF_OBJECT_COPYING) 
+       if (write && mn->flags & MF_OBJECT_COPYING) 
                goto out_object_copying;
        if (write && !(mn->flags & MF_OBJECT_EXIST)) {
                //calc new_target, copy up object
                r = copyup_object(peer, mn, pr);
-               if (r < 0) 
+               if (r < 0) {
+                       printf("err_copy\n");
                        goto out_err_copy;
+               }
                mn->flags |= MF_OBJECT_COPYING;
                goto out_object_copying;
        }
 
        strncpy(reply->segs[idx].target, mn->object, XSEG_MAX_TARGET_LEN); // or strlen(mn->target ?);
        reply->segs[idx].target[mn->objectlen] = 0;
-       xlock_release(&mn->lock);
        reply->segs[idx].offset = obj_offset;
        reply->segs[idx].size = obj_size;
        rem_size -= obj_size;
@@ -790,22 +792,23 @@ static int req2objs(struct peerd *peer, struct peer_req *pr,
                rem_size -= obj_size;
                mn = find_object(map, obj_index);
                if (!mn) {
+                       printf("coudn't find obj_index\n");
                        goto out_err;
                }
-               xlock_acquire(&mn->lock, 1);
-               if (mn->flags & MF_OBJECT_COPYING) 
+               if (write && mn->flags & MF_OBJECT_COPYING) 
                        goto out_object_copying;
                if (write && !(mn->flags & MF_OBJECT_EXIST)) {
                        //calc new_target, copy up object
                        r = copyup_object(peer, mn, pr);
-                       if (r < 0) 
+                       if (r < 0) {
+                               printf("err_copy\n");
                                goto out_err_copy;
+                       }
                        mn->flags |= MF_OBJECT_COPYING;
                        goto out_object_copying;
                }
                strncpy(reply->segs[idx].target, mn->object, XSEG_MAX_TARGET_LEN); // or strlen(mn->target ?);
                reply->segs[idx].target[mn->objectlen] = 0;
-               xlock_release(&mn->lock);
                reply->segs[idx].offset = obj_offset;
                reply->segs[idx].size = obj_size;
        }
@@ -813,12 +816,12 @@ static int req2objs(struct peerd *peer, struct peer_req *pr,
        return 0;
 
 out_object_copying:
-       __xq_append_tail(&mn->pending, (xqindex) pr);
-       xlock_release(&mn->lock);
-       return PENDING;
+       //printf("r2o mn: %lx\n", mn);
+       if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx)
+               printf("couldn't append pr to tail\n");
+       return MF_PENDING;
 
 out_err_copy:
-       xlock_release(&mn->lock);
 out_err:
        return -1;
 }
@@ -837,13 +840,19 @@ static int handle_mapr(struct peerd *peer, struct peer_req *pr,
                fail(peer, pr);
                return -1;
        }
-       else if (r == MAP_LOADING)
+       else if (r == MF_PENDING)
                return 0;
        
+       if (map->flags & MF_MAP_DESTROYED) {
+               fail(peer, pr);
+               return 0;
+       }
+       
        //get_object
        r = req2objs(peer, pr, map, 0);
-       if  (r < 0)
+       if  (r < 0){
                fail(peer, pr);
+       }
        else if (r == 0)
                complete(peer, pr);
 
@@ -859,47 +868,39 @@ static int handle_copyup(struct peerd *peer, struct peer_req *pr,
        (void) mapper;
        struct mapper_io *mio = __get_mapper_io(pr);
        int r = 0;
-       xlock_acquire(&mio->lock, 1);
+       //printf("handle copyup reply\n");
        if (req->state & XS_FAILED && !(req->state & XS_SERVED)) {
+               //printf("copy up failed\n");
                mio->err = 1;
                r = 1;
        }
        struct map_node *mn = __get_copyup_node(mio, req);
-       if (!mn)
+       if (!mn){
+               //printf("copy up mn not found\n");
                mio->err =1; //BUG
+       }
        else {
-               xlock_acquire(&mn->lock, 1);
+               //printf("mn: %lx\n", mn);
                mn->flags &= ~MF_OBJECT_COPYING;
-               mn->flags |= MF_OBJECT_EXIST;
                if (!r) {
-                       struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
-                       strncpy(mn->object, xcopy->target, strlen(xcopy->target));
+                       mn->flags |= MF_OBJECT_EXIST;
+                       char *target = xseg_get_target(peer->xseg, req);
+                       strncpy(mn->object, target, req->targetlen);
                }
-               xlock_release(&mn->lock);
        }
        __set_copyup_node(mio, req, NULL);
        xseg_put_request(peer->xseg, req, peer->portno);
 
        mio->copyups--;
-       if (!mio->copyups) {
-               if (mio->err)
-                       fail(peer, pr);
-               else
-                       complete(peer, pr);
-       }
-       xlock_release(&mio->lock);
-
        if (mn) {
                //handle peer_requests waiting on copy up
                xqindex idx;
-               xlock_acquire(&mn->lock, 1);
-               while ((idx = __xq_pop_head(&mn->pending) != Noneidx)){
-                       xlock_release(&mn->lock);
+               //printf("foo\n");
+               while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
+                       //printf("dispatching pending\n");
                        struct peer_req * preq = (struct peer_req *) idx;
                        dispatch(peer, preq, preq->req);
-                       xlock_acquire(&mn->lock, 1);
                }
-               xlock_release(&mn->lock);
        }
 
        return 0;
@@ -922,13 +923,27 @@ static int handle_mapw(struct peerd *peer, struct peer_req *pr,
                fail(peer, pr);
                return -1;
        }
-       else if (r == MAP_LOADING)
+       else if (r == MF_PENDING)
                return 0;
+       
+       if (map->flags & MF_MAP_DESTROYED) {
+               printf("map MF_MAP_DESTROYED req %lx\n", pr->req);
+               fail(peer, pr);
+               return 0;
+       }
+       if (mio->err) {
+               //printf("mapw failed\n");
+               fail(peer, pr);
+               return 0;
+       }
+       //printf("handle mapw\n");
 
-
+       mio->err = 0;
        r = req2objs(peer, pr, map, 1);
-       if (r < 0)
+       if (r < 0){
+               printf("req2obj returned r < 0 for req %lx\n", pr->req);
                fail(peer, pr);
+       }
        if (r == 0)
                complete(peer, pr);
        //else copyup pending, wait for pr restart
@@ -950,28 +965,50 @@ static int handle_info(struct peerd *peer, struct peer_req *pr,
        struct mapper_io *mio = __get_mapper_io(pr);
        (void) mio;
        char *target = xseg_get_target(peer->xseg, pr->req);
-       if (!target)
-               return -1;
-       printf("Handle info\n");
+       if (!target) {
+               fail(peer, pr);
+               return 0;
+       }
+       //printf("Handle info\n");
        struct map *map;
        int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
        if (r < 0) {
                fail(peer, pr);
                return -1;
        }
-       else if (r == MAP_LOADING)
+       else if (r == MF_PENDING)
+               return 0;
+       if (map->flags & MF_MAP_DESTROYED) {
+               fail(peer, pr);
                return 0;
-       else {
-               struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
-               xinfo->size = map->size;
-               complete(peer, pr);
        }
+       
+       struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
+       xinfo->size = map->size;
+       complete(peer, pr);
+
        return 0;
 }
 
 static int handle_destroy(struct peerd *peer, struct peer_req *pr, 
                                struct xseg_request *req)
 {
+       /*
+       struct map *map;
+       int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
+       if (r < 0) {
+               fail(peer, pr);
+               return -1;
+       }
+       else if (r == MF_PENDING)
+               return 0;
+       map->flags |= MF_MAP_DESTROYED;
+       */
+       //delete map block
+       //do not delete all objects
+       //remove_map(mapper, map);
+       //free(map, map_nodes, all allocated resources);
+       //complete(peer, pr);
        fail(peer, pr);
        return 0;
 }
@@ -1008,10 +1045,13 @@ int custom_peer_init(struct peerd *peer, int argc, const char *argv[])
        unsigned char buf[SHA256_DIGEST_SIZE];
        char *zero;
        struct sha256_ctx sha256ctx;
+       /* calculate out magic sha hash value */
        sha256_init_ctx(&sha256ctx);
        sha256_process_bytes(magic_string, strlen(magic_string), &sha256ctx);
        sha256_finish_ctx(&sha256ctx, magic_sha256);
 
+       /* calculate zero block */
+       //FIXME check hash value
        zero = malloc(block_size);
        memset(zero, 0, block_size);
        sha256_init_ctx(&sha256ctx);
@@ -1021,14 +1061,13 @@ int custom_peer_init(struct peerd *peer, int argc, const char *argv[])
                sprintf(zero_block + 2*i, "%02x", buf[i]);
        printf("%s \n", zero_block);
 
+       //FIXME error checks
        struct mapperd *mapper = malloc(sizeof(struct mapperd));
-       xlock_release(&mapper->maps_lock);
        mapper->hashmaps = xhash_new(3, STRING);
        peer->priv = mapper;
        
        for (i = 0; i < peer->nr_ops; i++) {
                struct mapper_io *mio = malloc(sizeof(struct mapper_io));
-               xlock_release(&mio->lock);
                mio->copyups_nodes = xhash_new(3, INTEGER);
                mio->copyups = 0;
                mio->err = 0;
@@ -1041,9 +1080,19 @@ int custom_peer_init(struct peerd *peer, int argc, const char *argv[])
                        i += 1;
                        continue;
                }
+               /* enforce only one thread */
+               if (!strcmp(argv[i], "-t") && (i+1) < argc){
+                       int t = atoi(argv[i+1]);
+                       if (t != 1) {
+                               printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
+                               return -1;
+                       }
+                       i += 1;
+                       continue;
+               }
        }
 
-       test_map(peer);
+       //test_map(peer);
 
        return 0;
 }
@@ -1063,12 +1112,14 @@ void print_map(struct map *m)
                        m->volume, m->volumelen, m->size, nr_objs);
        uint64_t i;
        struct map_node *mn;
-       if (nr_objs > 1000000)
+       if (nr_objs > 1000000) //FIXME to protect against invalid volume size
                return;
        for (i = 0; i < nr_objs; i++) {
                mn = find_object(m, i);
-               if (!mn)
+               if (!mn){
+                       printf("object idx [%llu] not found!\n", i);
                        continue;
+               }
                print_obj(mn);
        }
 }
index 0275306..9a1ff94 100644 (file)
@@ -314,6 +314,7 @@ int cmd_write(char *target, uint64_t offset)
                fprintf(stderr, "Cannot submit\n");
                return -1;
        }
+       xseg_signal(xseg, p);
 
        return 0;
 }
@@ -985,6 +986,9 @@ void handle_reply(struct xseg_request *req)
                break;
 
        case X_WRITE:
+               fprintf(stdout, "wrote: ");
+               fwrite(req_data, 1, req->datalen, stdout);
+               break;
        case X_SYNC:
        case X_DELETE:
        case X_TRUNCATE:
@@ -1009,6 +1013,7 @@ int cmd_wait(uint32_t nr)
 {
        struct xseg_request *req;
        long ret;
+       init_local_signal(); 
 
        for (;;) {
                req = xseg_receive(xseg, srcport);
index e4102c5..014f003 100644 (file)
@@ -1162,8 +1162,13 @@ int xseg_prep_request ( struct xseg* xseg, struct xseg_request *req,
 int xseg_resize_request (struct xseg *xseg, struct xseg_request *req,
                        uint32_t new_targetlen, uint64_t new_datalen)
 {
-       if (req->bufferlen >= new_datalen + new_targetlen)
+       if (req->bufferlen >= new_datalen + new_targetlen) {
+               req->data = req->buffer;
+               req->target = req->buffer + req->bufferlen - new_targetlen;
+               req->datalen = new_datalen;
+               req->targetlen = new_targetlen;
                return 0;
+       }
 
        if (req->buffer){
                void *ptr = XPTR_TAKE(req->buffer, xseg->segment);
index 399fd72..a747910 100644 (file)
@@ -103,6 +103,7 @@ alloc:
 //     printf("after heap->cur: %llu\n", heap->cur);
        h = (struct xheap_header *) (((unsigned long) mem) + heap->cur);
        h->size = bytes - sizeof(struct xheap_header);
+       h->magic = 0xdeadbeaf;
        XPTRSET(&h->heap, heap);
        heap->cur += bytes;
 
@@ -145,6 +146,9 @@ void xheap_free(void *ptr)
 {
        struct xheap_header *h = __get_header(ptr);
        struct xheap *heap = XPTR(&h->heap);
+       if (h->magic != 0xdeadbeaf) {
+               XSEGLOG("for ptr: %lx, magic %lx != 0xdeadbeaf", ptr, h->magic);
+       }
        void *mem = XPTR(&heap->mem);
        uint64_t size = xheap_get_chunk_size(ptr);
        xptr *free_list = (xptr *) mem;
index c4490ab..dd2139d 100644 (file)
@@ -5,6 +5,7 @@
 #include <xtypes/xlock.h>
 
 struct xheap_header {
+       uint64_t magic;
        XPTR_TYPE(struct xheap) heap;
        uint64_t size;
 };