add obj handlers output in xseg reportall
[archipelago] / xseg / peers / user / mt-mapperd.c
index 0bd4af8..e5021a3 100644 (file)
@@ -3,7 +3,7 @@
 #include <sys/types.h>
 #include <pthread.h>
 #include <xseg/xseg.h>
-#include <mpeer.h>
+#include <peer.h>
 #include <time.h>
 #include <xtypes/xlock.h>
 #include <xtypes/xhash.h>
 #include <fcntl.h>
 #include <gcrypt.h>
 #include <errno.h>
+#include <sched.h>
+#include <sys/syscall.h>
 
 GCRY_THREAD_OPTION_PTHREAD_IMPL;
-
-#define MF_PENDING 1
+#define MF_LOAD        (1 << 0)
+#define MF_EXCLUSIVE   (1 << 1)
+#define MF_FORCE       (1 << 2)
 
 #define SHA256_DIGEST_SIZE 32
 /* hex representation of sha256 value takes up double the sha256 size */
 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
 
-#define XSEG_MAX_TARGETLEN (SHA256_DIGEST_SIZE << 1)
-
 #define block_size (1<<22) //FIXME this should be defined here?
 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value  + volume size */
@@ -31,37 +32,103 @@ GCRY_THREAD_OPTION_PTHREAD_IMPL;
 #define MF_OBJECT_COPYING      (1 << 1)
 #define MF_OBJECT_WRITING      (1 << 2)
 #define MF_OBJECT_DELETING     (1 << 3)
+#define MF_OBJECT_DELETED      (1 << 4)
+#define MF_OBJECT_DESTROYED    (1 << 5)
 
 #define MF_OBJECT_NOT_READY    (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
-extern struct log_ctx lc;
 
 char *magic_string = "This a magic string. Please hash me";
 unsigned char magic_sha256[SHA256_DIGEST_SIZE];        /* sha256 hash value of magic string */
 char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
 
-//internal mapper states
+//dispatch_internal mapper states
 enum mapper_state {
        ACCEPTED = 0,
        WRITING = 1,
        COPYING = 2,
-       DELETING = 3
+       DELETING = 3,
+       DROPPING_CACHE = 4
 };
 
+typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
+
 struct map_node {
        uint32_t flags;
        uint32_t objectidx;
        uint32_t objectlen;
        char object[XSEG_MAX_TARGETLEN + 1];    /* NULL terminated string */
-       struct xq pending;                      /* pending peer_reqs on this object */
        struct map *map;
+       uint32_t ref;
+       uint32_t waiters;
+       st_cond_t cond;
 };
 
 #define MF_MAP_LOADING         (1 << 0)
 #define MF_MAP_DESTROYED       (1 << 1)
 #define MF_MAP_WRITING         (1 << 2)
 #define MF_MAP_DELETING                (1 << 3)
+#define MF_MAP_DROPPING_CACHE  (1 << 4)
+#define MF_MAP_EXCLUSIVE       (1 << 5)
+#define MF_MAP_OPENING         (1 << 6)
+#define MF_MAP_CLOSING         (1 << 7)
+
+#define MF_MAP_NOT_READY       (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
+                                       MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
+
+#define wait_on_pr(__pr, __condition__)        \
+       while (__condition__){                  \
+               ta--;                           \
+               __get_mapper_io(pr)->active = 0;\
+               XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
+               st_cond_wait(__pr->cond);       \
+       }
+
+#define wait_on_mapnode(__mn, __condition__)   \
+       while (__condition__){                  \
+               ta--;                           \
+               __mn->waiters++;                \
+               XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
+               st_cond_wait(__mn->cond);       \
+       }
+
+#define wait_on_map(__map, __condition__)      \
+       while (__condition__){                  \
+               ta--;                           \
+               __map->waiters++;               \
+               XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
+               st_cond_wait(__map->cond);      \
+       }
+
+#define signal_pr(__pr)                                \
+       do {                                    \
+               if (!__get_mapper_io(pr)->active){\
+                       ta++;                   \
+                       XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta); \
+                       __get_mapper_io(pr)->active = 1;\
+                       st_cond_signal(__pr->cond);     \
+               }                               \
+       }while(0)
+
+#define signal_map(__map)                      \
+       do {                                    \
+               if (__map->waiters) {           \
+                       ta += 1;                \
+                       XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
+                       __map->waiters--;       \
+                       st_cond_signal(__map->cond);    \
+               }                               \
+       }while(0)
+
+#define signal_mapnode(__mn)                   \
+       do {                                    \
+               if (__mn->waiters) {            \
+                       ta += __mn->waiters;    \
+                       XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
+                       __mn->waiters = 0;      \
+                       st_cond_broadcast(__mn->cond);  \
+               }                               \
+       }while(0)
 
-#define MF_MAP_NOT_READY       (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING)
 
 struct map {
        uint32_t flags;
@@ -69,7 +136,9 @@ struct map {
        uint32_t volumelen;
        char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
        xhash_t *objects;       /* obj_index --> map_node */
-       struct xq pending;      /* pending peer_reqs on this map */
+       uint32_t ref;
+       uint32_t waiters;
+       st_cond_t cond;
 };
 
 struct mapperd {
@@ -82,12 +151,18 @@ struct mapper_io {
        volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
        xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
        struct map_node *copyup_node;
-       int err;                        /* error flag */
+       volatile int err;                       /* error flag */
+       volatile uint64_t del_pending;
        uint64_t delobj;
+       uint64_t dcobj;
+       cb_t cb;
        enum mapper_state state;
+       volatile int active;
 };
 
-static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req);
+/* global vars */
+struct mapperd *mapper;
+
 void print_map(struct map *m);
 
 /*
@@ -106,6 +181,8 @@ static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
 
 static inline uint64_t calc_map_obj(struct map *map)
 {
+       if (map->size == -1)
+               return 0;
        uint64_t nr_objs = map->size / block_size;
        if (map->size % block_size)
                nr_objs++;
@@ -131,6 +208,8 @@ static uint32_t calc_nr_obj(struct xseg_request *req)
 /*
  * Maps handling functions
  */
+//FIXME assert targetlen > 0
+
 
 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
 {
@@ -140,6 +219,7 @@ static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targ
        //assert targetlen <= XSEG_MAX_TARGETLEN
        strncpy(buf, target, targetlen);
        buf[targetlen] = 0;
+       XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
        r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
        if (r < 0)
                return NULL;
@@ -155,10 +235,12 @@ static int insert_map(struct mapperd *mapper, struct map *map)
                XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
                goto out;
        }
-       
+
+       XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
+                       map->volume, strlen(map->volume), (unsigned long) map);
        r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
-       if (r == -XHASH_ERESIZE) {
-               xhashidx shift = xhash_grow_size_shift(map->objects);
+       while (r == -XHASH_ERESIZE) {
+               xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
                if (!new_hashmap){
                        XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
@@ -179,8 +261,8 @@ static int remove_map(struct mapperd *mapper, struct map *map)
        //assert no pending pr on map
        
        r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
-       if (r == -XHASH_ERESIZE) {
-               xhashidx shift = xhash_shrink_size_shift(map->objects);
+       while (r == -XHASH_ERESIZE) {
+               xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
                if (!new_hashmap){
                        XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
@@ -194,126 +276,83 @@ out:
        return r;
 }
 
-/* async map load */
-static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint32_t targetlen)
+static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
 {
        int r;
        xport p;
+       struct peerd *peer = pr->peer;
        struct xseg_request *req;
        struct mapperd *mapper = __get_mapperd(peer);
        void *dummy;
-       //printf("Loading map\n");
-
-       struct map *m = find_map(mapper, target, targetlen);
-       if (!m) {
-               m = malloc(sizeof(struct map));
-               if (!m){
-                       XSEGLOG2(&lc, E, "Cannot allocate map ");
-                       goto out_err;
-               }
-               m->size = -1;
-               strncpy(m->volume, target, targetlen);
-               m->volume[targetlen] = 0;
-               m->volumelen = targetlen;
-               m->flags = MF_MAP_LOADING;
-               xqindex *qidx = xq_alloc_empty(&m->pending, peer->nr_ops);
-               if (!qidx) {
-                       XSEGLOG2(&lc, E, "Cannot allocate pending queue for map %s",
-                                       m->volume);
-                       goto out_map;
-               }
-               m->objects = xhash_new(3, INTEGER); 
-               if (!m->objects){
-                       XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
-                                       m->volume);
-                       goto out_q;
-               }
-               __xq_append_tail(&m->pending, (xqindex) pr); //FIXME err check
-       } else {
-               goto map_exists;
-       }
 
-       r = insert_map(mapper, m);
-       if (r < 0)  
-               goto out_hash;
-       
-       //printf("Loading map: preparing req\n");
+       XSEGLOG2(&lc, I, "Closing map %s", map->volume);
 
-       req = xseg_get_request(peer->xseg, peer->portno, mapper->mbportno, X_ALLOC);
+       req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
        if (!req){
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
-                               m->volume);
-               goto out_fail;
+                               map->volume);
+               goto out_err;
        }
 
-       r = xseg_prep_request(peer->xseg, req, targetlen, block_size);
+       r = xseg_prep_request(peer->xseg, req, map->volumelen, block_size);
        if (r < 0){
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
-                               m->volume);
+                               map->volume);
                goto out_put;
        }
 
        char *reqtarget = xseg_get_target(peer->xseg, req);
        if (!reqtarget)
                goto out_put;
-       strncpy(reqtarget, target, targetlen);
-       req->op = X_READ;
+       strncpy(reqtarget, map->volume, req->targetlen);
+       req->op = X_CLOSE;
        req->size = block_size;
        req->offset = 0;
        r = xseg_set_req_data(peer->xseg, req, pr);
        if (r < 0){
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
-                               m->volume);
+                               map->volume);
                goto out_put;
        }
-       p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){ 
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
-                               m->volume);
+                               map->volume);
                goto out_unset;
        }
        r = xseg_signal(peer->xseg, p);
        
-       XSEGLOG2(&lc, I, "Map %s loading", m->volume);
-       return 0;
+       XSEGLOG2(&lc, I, "Map %s closing", map->volume);
+       return req;
 
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
-
-out_fail:
-       remove_map(mapper, m);
-       xqindex idx;
-       while((idx = __xq_pop_head(&m->pending)) != Noneidx) {
-               fail(peer, (struct peer_req *) idx);
-       }
-
-out_hash:
-       xhash_free(m->objects);
-out_q:
-       xq_free(&m->pending);
-out_map:
-       XSEGLOG2(&lc, E, "failed to load map %s", m->volume);
-       free(m);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
-       return -1;
+       return NULL;
+}
 
-map_exists:
-       //assert map loading when this is reached
-       if (m->flags & MF_MAP_LOADING) {
-               XSEGLOG2(&lc, I, "Map %s already exists and loading. "
-                               "Adding to pending queue", m->volume);
-               __xq_append_tail(&m->pending, (xqindex) pr); //FIXME errcheck
-       }
-       else {
-               XSEGLOG2(&lc, I, "Map %s already exists and loaded. Dispatching.", m->volume);
-               my_dispatch(peer, pr, pr->req);
-       }
+static int close_map(struct peer_req *pr, struct map *map)
+{
+       int err;
+       struct xseg_request *req;
+       struct peerd *peer = pr->peer;
+
+       map->flags |= MF_MAP_CLOSING;
+       req = __close_map(pr, map);
+       if (!req)
+               return -1;
+       wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
+       map->flags &= ~MF_MAP_CLOSING;
+       err = req->state & XS_FAILED;
+       xseg_put_request(peer->xseg, req, pr->portno);
+       if (err)
+               return -1;
        return 0;
 }
 
-
+/*
 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
                                char *target, uint32_t targetlen, struct map **m)
 {
@@ -321,6 +360,7 @@ static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
        int r;
        *m = find_map(mapper, target, targetlen);
        if (*m) {
+               XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
                if ((*m)->flags & MF_MAP_NOT_READY) {
                        __xq_append_tail(&(*m)->pending, (xqindex) pr);
                        XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
@@ -333,12 +373,12 @@ static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
                        return 0;
                }
        }
-       r = load_map(peer, pr, target, targetlen);
+       r = open_map(peer, pr, target, targetlen, 0);
        if (r < 0)
                return -1; //error
        return MF_PENDING;      
 }
-
+*/
 /*
  * Object handling functions
  */
@@ -411,12 +451,12 @@ static inline void mapheader_to_map(struct map *m, char *buf)
 }
 
 
-static int object_write(struct peerd *peer, struct peer_req *pr, 
+static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr, 
                                struct map *map, struct map_node *mn)
 {
        void *dummy;
        struct mapperd *mapper = __get_mapperd(peer);
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
                                                        mapper->mbportno, X_ALLOC);
        if (!req){
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
@@ -424,7 +464,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
                goto out_err;
        }
-       int r = xseg_prep_request(peer->xseg, req, mn->objectlen, objectsize_in_map);
+       int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
        if (r < 0){
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
                                "(Map: %s [%llu]",
@@ -432,7 +472,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
                goto out_put;
        }
        char *target = xseg_get_target(peer->xseg, req);
-       strncpy(target, map->volume, map->volumelen);
+       strncpy(target, map->volume, req->targetlen);
        req->size = objectsize_in_map;
        req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
        req->op = X_WRITE;
@@ -446,7 +486,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
                goto out_put;
        }
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){
                XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
                                "(Map: %s [%llu]",
@@ -461,26 +501,27 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
                        "Map: %s [%llu]",
                        mn->object, map->volume, (unsigned long long) mn->objectidx);
 
-       return MF_PENDING;
+       return req;
 
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
                        "(Map: %s [%llu]",
                        mn->object, map->volume, (unsigned long long) mn->objectidx);
-       return -1;
+       return NULL;
 }
 
-static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
+static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
 {
        void *dummy;
+       struct peerd *peer = pr->peer;
        struct mapperd *mapper = __get_mapperd(peer);
        struct map_node *mn;
        uint64_t i, pos, max_objidx = calc_map_obj(map);
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->mbportno, X_ALLOC);
        if (!req){
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
@@ -519,7 +560,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
                                map->volume);
                goto out_put;
        }
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
                                map->volume);
@@ -531,24 +572,98 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
 
        map->flags |= MF_MAP_WRITING;
        XSEGLOG2(&lc, I, "Writing map %s", map->volume);
-       return MF_PENDING;
+       return req;
 
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
-       return -1;
+       return NULL;
+}
+
+static int write_map(struct peer_req* pr, struct map *map)
+{
+       int r = 0;
+       struct peerd *peer = pr->peer;
+       map->flags |= MF_MAP_WRITING;
+       struct xseg_request *req = __write_map(pr, map);
+       if (!req)
+               return -1;
+       wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
+       if (req->state & XS_FAILED)
+               r = -1;
+       xseg_put_request(peer->xseg, req, pr->portno);
+       map->flags &= ~MF_MAP_WRITING;
+       return r;
+}
+
+static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
+{
+       int r;
+       xport p;
+       struct xseg_request *req;
+       struct peerd *peer = pr->peer;
+       struct mapperd *mapper = __get_mapperd(peer);
+       void *dummy;
+
+       XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
+
+       req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
+       if (!req){
+               XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
+                               m->volume);
+               goto out_fail;
+       }
+
+       r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
+       if (r < 0){
+               XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
+                               m->volume);
+               goto out_put;
+       }
+
+       char *reqtarget = xseg_get_target(peer->xseg, req);
+       if (!reqtarget)
+               goto out_put;
+       strncpy(reqtarget, m->volume, req->targetlen);
+       req->op = X_READ;
+       req->size = block_size;
+       req->offset = 0;
+       r = xseg_set_req_data(peer->xseg, req, pr);
+       if (r < 0){
+               XSEGLOG2(&lc, E, "Cannot set request data for map %s",
+                               m->volume);
+               goto out_put;
+       }
+       p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
+       if (p == NoPort){ 
+               XSEGLOG2(&lc, E, "Cannot submit request for map %s",
+                               m->volume);
+               goto out_unset;
+       }
+       r = xseg_signal(peer->xseg, p);
+       
+       XSEGLOG2(&lc, I, "Map %s loading", m->volume);
+       return req;
+
+out_unset:
+       xseg_get_req_data(peer->xseg, req, &dummy);
+out_put:
+       xseg_put_request(peer->xseg, req, pr->portno);
+out_fail:
+       return NULL;
 }
 
-static int read_map (struct peerd *peer, struct map *map, char *buf)
+static int read_map (struct map *map, char *buf)
 {
        char nulls[SHA256_DIGEST_SIZE];
        memset(nulls, 0, SHA256_DIGEST_SIZE);
 
        int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
        if (r) {
+               XSEGLOG2(&lc, D, "Read zeros");
                //read error;
                return -1;
        }
@@ -572,8 +687,9 @@ static int read_map (struct peerd *peer, struct map *map, char *buf)
                for (i = 0; i < nr_objs; i++) {
                        map_node[i].map = map;
                        map_node[i].objectidx = i;
-                       xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
-                       (void) qidx;
+                       map_node[i].waiters = 0;
+                       map_node[i].ref = 1;
+                       map_node[i].cond = st_cond_new(); //FIXME err check;
                        map_to_object(&map_node[i], buf + pos);
                        pos += objectsize_in_map;
                        r = insert_object(map, &map_node[i]); //FIXME error check
@@ -589,20 +705,121 @@ static int read_map (struct peerd *peer, struct map *map, char *buf)
                                break;
                        map_node[i].objectidx = i;
                        map_node[i].map = map;
-                       xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
-                       (void) qidx;
+                       map_node[i].waiters = 0;
+                       map_node[i].ref = 1;
+                       map_node[i].cond = st_cond_new(); //FIXME err check;
                        pithosmap_to_object(&map_node[i], buf + pos);
                        pos += SHA256_DIGEST_SIZE; 
                        r = insert_object(map, &map_node[i]); //FIXME error check
                }
                map->size = i * block_size; 
        }
+       print_map(map);
        XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
        return 0;
 
        //FIXME cleanup on error
 }
 
+static int load_map(struct peer_req *pr, struct map *map)
+{
+       int r = 0;
+       struct xseg_request *req;
+       struct peerd *peer = pr->peer;
+       map->flags |= MF_MAP_LOADING;
+       req = __load_map(pr, map);
+       if (!req)
+               return -1;
+       wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
+       map->flags &= ~MF_MAP_LOADING;
+       if (req->state & XS_FAILED){
+               XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
+               xseg_put_request(peer->xseg, req, pr->portno);
+               return -1;
+       }
+       r = read_map(map, xseg_get_data(peer->xseg, req));
+       xseg_put_request(peer->xseg, req, pr->portno);
+       return r;
+}
+
+static struct xseg_request * __open_map(struct peer_req *pr, struct map *m)
+{
+       int r;
+       xport p;
+       struct xseg_request *req;
+       struct peerd *peer = pr->peer;
+       struct mapperd *mapper = __get_mapperd(peer);
+       void *dummy;
+
+       XSEGLOG2(&lc, I, "Opening map %s", m->volume);
+
+       req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
+       if (!req){
+               XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
+                               m->volume);
+               goto out_fail;
+       }
+
+       r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
+       if (r < 0){
+               XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
+                               m->volume);
+               goto out_put;
+       }
+
+       char *reqtarget = xseg_get_target(peer->xseg, req);
+       if (!reqtarget)
+               goto out_put;
+       strncpy(reqtarget, m->volume, req->targetlen);
+       req->op = X_OPEN;
+       req->size = block_size;
+       req->offset = 0;
+       r = xseg_set_req_data(peer->xseg, req, pr);
+       if (r < 0){
+               XSEGLOG2(&lc, E, "Cannot set request data for map %s",
+                               m->volume);
+               goto out_put;
+       }
+       p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
+       if (p == NoPort){ 
+               XSEGLOG2(&lc, E, "Cannot submit request for map %s",
+                               m->volume);
+               goto out_unset;
+       }
+       r = xseg_signal(peer->xseg, p);
+       
+       XSEGLOG2(&lc, I, "Map %s opening", m->volume);
+       return req;
+
+out_unset:
+       xseg_get_req_data(peer->xseg, req, &dummy);
+out_put:
+       xseg_put_request(peer->xseg, req, pr->portno);
+out_fail:
+       return NULL;
+}
+
+static int open_map(struct peer_req *pr, struct map *map)
+{
+       int err;
+       struct xseg_request *req;
+       struct peerd *peer = pr->peer;
+
+       map->flags |= MF_MAP_OPENING;
+       req = __open_map(pr, map);
+       if (!req)
+               return -1;
+       wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
+       map->flags &= ~MF_MAP_OPENING;
+       err = req->state & XS_FAILED;
+       xseg_put_request(peer->xseg, req, pr->portno);
+       if (err)
+               return -1;
+       else 
+               map->flags |= MF_MAP_EXCLUSIVE;
+       return 0;
+}
+
 /*
  * copy up functions
  */
@@ -610,7 +827,6 @@ static int read_map (struct peerd *peer, struct map *map, char *buf)
 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
 {
        int r = 0;
-       /*
        if (mn){
                r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
                if (r == -XHASH_ERESIZE) {
@@ -634,27 +850,23 @@ static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, st
                }
        }
 out:
-       */
-       mio->copyup_node = mn;
        return r;
 }
 
 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
 {
-       /*
        struct map_node *mn;
        int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
        if (r < 0)
                return NULL;
        return mn;
-       */
-       return mio->copyup_node;
 }
 
-static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
+static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
 {
        struct mapperd *mapper = __get_mapperd(peer);
        struct mapper_io *mio = __get_mapper_io(pr);
+       struct map *map = mn->map;
        void *dummy;
        int r = -1, i;
        xport p;
@@ -664,8 +876,8 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re
        char new_target[XSEG_MAX_TARGETLEN + 1]; 
        unsigned char buf[SHA256_DIGEST_SIZE];  //assert sha256_digest_size(32) <= MAXTARGETLEN 
        char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
-       strncpy(new_object, mn->object, mn->objectlen);
-       sprintf(new_object + mn->objectlen, "%u", mn->objectidx); //sprintf adds null termination
+       strncpy(new_object, map->volume, map->volumelen);
+       sprintf(new_object + map->volumelen, "%u", mn->objectidx); //sprintf adds null termination
        new_object[XSEG_MAX_TARGETLEN + 19] = 0;
 
        gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
@@ -673,18 +885,24 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re
                sprintf (new_target + 2*i, "%02x", buf[i]);
        newtargetlen = SHA256_DIGEST_SIZE  * 2;
 
+       if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE)) 
+               goto copyup_zeroblock;
 
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->bportno, X_ALLOC);
-       if (!req)
+       if (!req){
+               XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
                goto out_err;
+       }
        r = xseg_prep_request(peer->xseg, req, newtargetlen, 
                                sizeof(struct xseg_request_copy));
-       if (r < 0)
+       if (r < 0){
+               XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
                goto out_put;
+       }
 
        char *target = xseg_get_target(peer->xseg, req);
-       strncpy(target, new_target, newtargetlen);
+       strncpy(target, new_target, req->targetlen);
 
        struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
        strncpy(xcopy->target, mn->object, mn->objectlen);
@@ -694,367 +912,391 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re
        req->size = block_size;
        req->op = X_COPY;
        r = xseg_set_req_data(peer->xseg, req, pr);
-       if (r<0)
+       if (r<0){
+               XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
                goto out_put;
+       }
        r = __set_copyup_node(mio, req, mn);
-       p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort) {
+               XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
                goto out_unset;
        }
        xseg_signal(peer->xseg, p);
-       mio->copyups++;
+//     mio->copyups++;
 
+       mn->flags |= MF_OBJECT_COPYING;
        XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
-       return 0;
+       return req;
 
 out_unset:
+       r = __set_copyup_node(mio, req, NULL);
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
-       return -1;
+       return NULL;
 
+copyup_zeroblock:
+       XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
+                       "Proceeding in writing the new object in map");
+       /* construct a tmp map_node for writing purposes */
+       struct map_node newmn = *mn;
+       newmn.flags = MF_OBJECT_EXIST;
+       strncpy(newmn.object, new_target, newtargetlen);
+       newmn.object[newtargetlen] = 0;
+       newmn.objectlen = newtargetlen;
+       newmn.objectidx = mn->objectidx; 
+       req = object_write(peer, pr, map, &newmn);
+       r = __set_copyup_node(mio, req, mn);
+       if (!req){
+               XSEGLOG2(&lc, E, "Object write returned error for object %s"
+                               "\n\t of map %s [%llu]",
+                               mn->object, map->volume, (unsigned long long) mn->objectidx);
+               return NULL;
+       }
+       mn->flags |= MF_OBJECT_WRITING;
+       XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
+       return req;
 }
 
-/*
- * request handling functions
- */
-
-static int handle_mapread(struct peerd *peer, struct peer_req *pr, 
-                               struct xseg_request *req)
+static struct xseg_request * delete_object(struct peer_req *pr, struct map_node *mn)
 {
-       int r;
-       xqindex idx;
-       char buf[XSEG_MAX_TARGETLEN];
+       void *dummy;
+       struct peerd *peer = pr->peer;
        struct mapperd *mapper = __get_mapperd(peer);
-       //assert req->op = X_READ;
-       char *target = xseg_get_target(peer->xseg, req);
-       struct map *map = find_map(mapper, target, req->targetlen);
-       if (!map)
+       struct mapper_io *mio = __get_mapper_io(pr);
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
+                                                       mapper->bportno, X_ALLOC);
+       XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
+       if (!req){
+               XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
                goto out_err;
-       //assert map->flags & MF_MAP_LOADING
-
-       if (req->state & XS_FAILED)
-               goto out_fail;
-
-       char *data = xseg_get_data(peer->xseg, req);
-       r = read_map(peer, map, data);
-       if (r < 0)
-               goto out_fail;
-       
-       xseg_put_request(peer->xseg, req, peer->portno);
-       map->flags &= ~MF_MAP_LOADING;
-       XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
-       while((idx = __xq_pop_head(&map->pending)) != Noneidx){
-               struct peer_req *preq = (struct peer_req *) idx;
-               my_dispatch(peer, preq, preq->req);
        }
-       return 0;
-
-out_fail:
-       XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
-       xseg_put_request(peer->xseg, req, peer->portno);
-       map->flags &= ~MF_MAP_LOADING;
-       while((idx = __xq_pop_head(&map->pending)) != Noneidx){
-               struct peer_req *preq = (struct peer_req *) idx;
-               fail(peer, preq);
+       int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
+       if (r < 0){
+               XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
+               goto out_put;
        }
-       remove_map(mapper, map);
-       //FIXME not freeing up all objects + object hash
-       free(map);
-       return 0;
-
-out_err:
-       strncpy(buf, target, req->targetlen);
-       buf[req->targetlen] = 0;
-       XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
-       xseg_put_request(peer->xseg, req, peer->portno);
-       return -1;
-}
-
-static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
-                               struct xseg_request *req)
-{
-       xqindex idx;
-       char buf[XSEG_MAX_TARGETLEN];
-       struct mapperd *mapper = __get_mapperd(peer);
-       //assert req->op = X_WRITE;
        char *target = xseg_get_target(peer->xseg, req);
-       struct map *map = find_map(mapper, target, req->targetlen);
-       if (!map) {
-               fprintf(stderr, "couldn't find map\n");
-               goto out_err;
+       strncpy(target, mn->object, req->targetlen);
+       req->op = X_DELETE;
+       req->size = req->datalen;
+       req->offset = 0;
+       r = xseg_set_req_data(peer->xseg, req, pr);
+       if (r < 0){
+               XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
+               goto out_put;
        }
-       //assert map->flags & MF_MAP_WRITING
-
-       if (req->state & XS_FAILED){
-               fprintf(stderr, "write request failed\n");
-               goto out_fail;
-       }
-       
-       xseg_put_request(peer->xseg, req, peer->portno);
-       map->flags &= ~MF_MAP_WRITING;
-       XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
-       while((idx = __xq_pop_head(&map->pending)) != Noneidx){
-               struct peer_req *preq = (struct peer_req *) idx;
-               my_dispatch(peer, preq, preq->req);
+       __set_copyup_node(mio, req, mn);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
+       if (p == NoPort){
+               XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
+               goto out_unset;
        }
-       return 0;
+       r = xseg_signal(peer->xseg, p);
+       XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
+       return req;
 
+out_unset:
+       xseg_get_req_data(peer->xseg, req, &dummy);
+out_put:
+       xseg_put_request(peer->xseg, req, pr->portno);
+out_err:
+       XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
+       return NULL;
+}
 
-out_fail:
-       XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
-       xseg_put_request(peer->xseg, req, peer->portno);
-       map->flags &= ~MF_MAP_WRITING;
-       while((idx = __xq_pop_head(&map->pending)) != Noneidx){
-               struct peer_req *preq = (struct peer_req *) idx;
-               fail(peer, preq);
+static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
+{
+       void *dummy;
+       struct peerd *peer = pr->peer;
+       struct mapperd *mapper = __get_mapperd(peer);
+       struct mapper_io *mio = __get_mapper_io(pr);
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
+                                                       mapper->mbportno, X_ALLOC);
+       XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
+       if (!req){
+               XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
+               goto out_err;
        }
-       remove_map(mapper, map);
-       //FIXME not freeing up all objects + object hash
-       free(map);
-       return 0;
+       int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
+       if (r < 0){
+               XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
+               goto out_put;
+       }
+       char *target = xseg_get_target(peer->xseg, req);
+       strncpy(target, map->volume, req->targetlen);
+       req->op = X_DELETE;
+       req->size = req->datalen;
+       req->offset = 0;
+       r = xseg_set_req_data(peer->xseg, req, pr);
+       if (r < 0){
+               XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
+               goto out_put;
+       }
+       __set_copyup_node(mio, req, NULL);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
+       if (p == NoPort){
+               XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
+               goto out_unset;
+       }
+       r = xseg_signal(peer->xseg, p);
+       map->flags |= MF_MAP_DELETING;
+       XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
+       return req;
 
+out_unset:
+       xseg_get_req_data(peer->xseg, req, &dummy);
+out_put:
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
-       strncpy(buf, target, req->targetlen);
-       buf[req->targetlen] = 0;
-       XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
-       xseg_put_request(peer->xseg, req, peer->portno);
-       return -1;
+       XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
+       return  NULL;
 }
 
-static int handle_clone(struct peerd *peer, struct peer_req *pr, 
-                               struct xseg_request *req)
+
+static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
 {
-       struct mapperd *mapper = __get_mapperd(peer);
-       struct mapper_io *mio = __get_mapper_io(pr);
-       (void) mio;
-       int r;
-       char buf[XSEG_MAX_TARGETLEN + 1];
-       char *target;
+       struct map_node *mn = find_object(map, index);
+       if (mn)
+               mn->ref++;
+       return mn;
+}
 
-       if (pr->req->op != X_CLONE) {
-               //wtf??
-               XSEGLOG2(&lc, E, "Unknown op %u", req->op);
-               fail(peer, pr);
-               return 0;
+static inline void put_mapnode(struct map_node *mn)
+{
+       mn->ref--;
+       if (!mn->ref){
+               //clean up mn
+               st_cond_destroy(mn->cond);
        }
+}
 
-       if (req->op == X_WRITE){
-                       //assert state = WRITING;
-                       r = handle_mapwrite(peer, pr ,req);
-                       if (r < 0){
-                               XSEGLOG2(&lc, E, "handle mapwrite returned error");
-                               fail(peer, pr);
-                       }
-                       return 0;
-       }
+static inline void __get_map(struct map *map)
+{
+       map->ref++;
+}
 
-       if (mio->state == WRITING) {
-               target = xseg_get_target(peer->xseg, pr->req);
-               strncpy(buf, target, req->targetlen);
-               buf[req->targetlen] = 0;
-               XSEGLOG2(&lc, I, "Completing clone request for map %s", buf);
-               complete(peer, pr);
-               return 0;
+static inline void put_map(struct map *map)
+{
+       struct map_node *mn;
+       map->ref--;
+       if (!map->ref){
+               XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
+               //clean up map
+               uint64_t i;
+               for (i = 0; i < calc_map_obj(map); i++) {
+                       mn = get_mapnode(map, i);
+                       if (mn) {
+                               //make sure all pending operations on all objects are completed
+                               if (mn->flags & MF_OBJECT_NOT_READY){
+                                       //this should never happen...
+                                       wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
+                               }
+                               mn->flags &= MF_OBJECT_DESTROYED;
+                               put_mapnode(mn); //matchin mn->ref = 1 on mn init
+                               put_mapnode(mn); //matcing get_mapnode;
+                               //assert mn->ref == 0;
+                       }
+               }
+               mn = find_object(map, 0);
+               if (mn)
+                       free(mn);
+               XSEGLOG2(&lc, I, "Freed map %s", map->volume);
+               free(map);
        }
+}
 
-       struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
-       if (!xclone) {
+static struct map * create_map(struct mapperd *mapper, char *name, uint32_t namelen)
+{
+       int r;
+       struct map *m = malloc(sizeof(struct map));
+       if (!m){
+               XSEGLOG2(&lc, E, "Cannot allocate map ");
                goto out_err;
        }
-       struct map *map;
-       r = find_or_load_map(peer, pr, xclone->target, xclone->targetlen, &map);
+       m->size = -1;
+       strncpy(m->volume, name, namelen);
+       m->volume[namelen] = 0;
+       m->volumelen = namelen;
+       m->flags = 0;
+       m->objects = xhash_new(3, INTEGER); 
+       if (!m->objects){
+               XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
+                               m->volume);
+               goto out_map;
+       }
+       m->ref = 1;
+       m->waiters = 0;
+       m->cond = st_cond_new(); //FIXME err check;
+       r = insert_map(mapper, m);
        if (r < 0){
-               goto out_err;
+               XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
+               goto out_hash;
        }
-       else if (r == MF_PENDING)
-               return 0;
-       
-       if (map->flags & MF_MAP_DESTROYED) {
-               strncpy(buf, xclone->target, xclone->targetlen);
-               buf[xclone->targetlen] = 0;
-               XSEGLOG2(&lc, W, "Map %s destroyed", buf);
-               target = xseg_get_target(peer->xseg, pr->req);
-               strncpy(buf, target, req->targetlen);
-               buf[req->targetlen] = 0;
-               XSEGLOG2(&lc, W, "Cannont clone %s because base map destroyed", buf);
-               fail(peer, pr);
-               return 0;
+
+       return m;
+
+out_hash:
+       xhash_free(m->objects);
+out_map:
+       XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
+       free(m);
+out_err:
+       return NULL;
+}
+
+
+
+void deletion_cb(struct peer_req *pr, struct xseg_request *req)
+{
+       struct peerd *peer = pr->peer;
+       struct mapperd *mapper = __get_mapperd(peer);
+       (void)mapper;
+       struct mapper_io *mio = __get_mapper_io(pr);
+       struct map_node *mn = __get_copyup_node(mio, req);
+
+       mio->del_pending--;
+       if (req->state & XS_FAILED){
+               mio->err = 1;
        }
+       signal_mapnode(mn);
+       xseg_put_request(peer->xseg, req, pr->portno);
+       signal_pr(pr);
+}
 
-       struct map *clonemap = malloc(sizeof(struct map));
-       if (!clonemap) {
+void copyup_cb(struct peer_req *pr, struct xseg_request *req)
+{
+       struct peerd *peer = pr->peer;
+       struct mapperd *mapper = __get_mapperd(peer);
+       (void)mapper;
+       struct mapper_io *mio = __get_mapper_io(pr);
+       struct map_node *mn = __get_copyup_node(mio, req);
+       if (!mn){
+               XSEGLOG2(&lc, E, "Cannot get map node");
                goto out_err;
        }
-       /*
-       FIXME check if clone map exists
-       find_or_load_map(peer, pr, target, req->targetlen, &clonemap)
-       ... (on destroyed what ??
-       if (clonemap) {
-               target = xseg_get_target(peer->xseg, pr->req);
-               strncpy(buf, target, req->targetlen);
-               buf[req->targetlen] = 0;
-               XSEGLOG2(&lc, W, "Map %s requested for clone exists", buf);
-               fail(peer, pr);
-               return 0;
-       }
-       */
-       //alloc and init struct map
-       clonemap->objects = xhash_new(3, INTEGER);
-       if (!clonemap->objects){
-               goto out_err_clonemap;
-       }
-       xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops);
-       if (!qidx){
-               goto out_err_objhash;
-       }
-       if (xclone->size < map->size) {
-               target = xseg_get_target(peer->xseg, pr->req);
-               strncpy(buf, target, req->targetlen);
-               buf[req->targetlen] = 0;
-               XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
-                               "\n\t for requested clone %s",
-                               (unsigned long long) xclone->size,
-                               (unsigned long long) map->size, buf);
-               goto out_err_q;
-       }
-       if (xclone->size == -1)
-               clonemap->size = map->size;
-       else
-               clonemap->size = xclone->size;
-       clonemap->flags = 0;
-       target = xseg_get_target(peer->xseg, pr->req);
-       strncpy(clonemap->volume, target, pr->req->targetlen);
-       clonemap->volumelen = pr->req->targetlen;
-       clonemap->volume[clonemap->volumelen] = 0; //NULL TERMINATE
+       __set_copyup_node(mio, req, NULL);
 
-       //alloc and init map_nodes
-       unsigned long c = clonemap->size/block_size + 1;
-       struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
-       if (!map_nodes){
-               goto out_err_q;
+       if (req->state & XS_FAILED){
+               XSEGLOG2(&lc, E, "Req failed");
+               mn->flags &= ~MF_OBJECT_COPYING;
+               mn->flags &= ~MF_OBJECT_WRITING;
+               goto out_err;
        }
-       int i;
-       for (i = 0; i < clonemap->size/block_size + 1; i++) {
-               struct map_node *mn = find_object(map, i);
-               if (mn) {
-                       strncpy(map_nodes[i].object, mn->object, mn->objectlen);
-                       map_nodes[i].objectlen = mn->objectlen;
-               } else {
-                       strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2 ?
-                       map_nodes[i].objectlen = strlen(zero_block);
+       if (req->op == X_WRITE) {
+               char *target = xseg_get_target(peer->xseg, req);
+               (void)target;
+               //printf("handle object write replyi\n");
+               __set_copyup_node(mio, req, NULL);
+               //assert mn->flags & MF_OBJECT_WRITING
+               mn->flags &= ~MF_OBJECT_WRITING;
+               
+               struct map_node tmp;
+               char *data = xseg_get_data(peer->xseg, req);
+               map_to_object(&tmp, data);
+               mn->flags |= MF_OBJECT_EXIST;
+               if (mn->flags != MF_OBJECT_EXIST){
+                       XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
+                       goto out_err;
                }
-               map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
-               map_nodes[i].flags = 0;
-               map_nodes[i].objectidx = i;
-               map_nodes[i].map = clonemap;
-               xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops);
-               r = insert_object(clonemap, &map_nodes[i]);
-               if (r < 0){
-                       goto out_free_all;
+               //assert mn->flags & MF_OBJECT_EXIST
+               strncpy(mn->object, tmp.object, tmp.objectlen);
+               mn->object[tmp.objectlen] = 0;
+               mn->objectlen = tmp.objectlen;
+               XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
+               mio->copyups--;
+               signal_mapnode(mn);
+       } else if (req->op == X_COPY) {
+       //      issue write_object;
+               mn->flags &= ~MF_OBJECT_COPYING;
+               struct map *map = mn->map;
+               if (!map){
+                       XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
+                       goto out_err;
                }
-       }
-       //insert map
-       r = insert_map(mapper, clonemap);
-       if ( r < 0) {
-               XSEGLOG2(&lc, E, "Cannot insert map %s", clonemap->volume);
-               goto out_free_all;
-       }
-       r = map_write(peer, pr, clonemap);
-       if (r < 0){
-               XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
-               goto out_remove;
-       }
-       else if (r == MF_PENDING) {
-               //maybe move this to map_write
-               XSEGLOG2(&lc, I, "Writing map %s", clonemap->volume);
-               __xq_append_tail(&clonemap->pending, (xqindex) pr);
-               mio->state = WRITING;
-               return 0;
+
+               /* construct a tmp map_node for writing purposes */
+               char *target = xseg_get_target(peer->xseg, req);
+               struct map_node newmn = *mn;
+               newmn.flags = MF_OBJECT_EXIST;
+               strncpy(newmn.object, target, req->targetlen);
+               newmn.object[req->targetlen] = 0;
+               newmn.objectlen = req->targetlen;
+               newmn.objectidx = mn->objectidx; 
+               struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
+               if (!xreq){
+                       XSEGLOG2(&lc, E, "Object write returned error for object %s"
+                                       "\n\t of map %s [%llu]",
+                                       mn->object, map->volume, (unsigned long long) mn->objectidx);
+                       goto out_err;
+               }
+               mn->flags |= MF_OBJECT_WRITING;
+               __set_copyup_node (mio, xreq, mn);
+
+               XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
        } else {
-               //unknown state
-               XSEGLOG2(&lc, I, "Map write for map %s returned unknown value", clonemap->volume);
-               goto out_remove;
+               //wtf??
+               ;
        }
-       
-       return 0;
 
-out_remove:
-       remove_map(mapper, clonemap);
-out_free_all:
-       //FIXME not freeing allocated queues of map_nodes
-       free(map_nodes);
-out_err_q:
-       xq_free(&clonemap->pending);
-out_err_objhash:
-       xhash_free(clonemap->objects);
-out_err_clonemap:
-       free(clonemap);
+out:
+       xseg_put_request(peer->xseg, req, pr->portno);
+       return;
+
 out_err:
-       target = xseg_get_target(peer->xseg, pr->req);
-       strncpy(buf, target, req->targetlen);
-       buf[req->targetlen] = 0;
-       XSEGLOG2(&lc, E, "Clone map for %s failed", buf);
-       fail(peer, pr);
-       return -1;
+       mio->copyups--;
+       XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
+       mio->err = 1;
+       if (mn)
+               signal_mapnode(mn);
+       goto out;
+
 }
 
-static int req2objs(struct peerd *peer, struct peer_req *pr, 
-                                       struct map *map, int write)
+struct r2o {
+       struct map_node *mn;
+       uint64_t offset;
+       uint64_t size;
+};
+
+static int req2objs(struct peer_req *pr, struct map *map, int write)
 {
+       int r = 0;
+       struct peerd *peer = pr->peer;
+       struct mapper_io *mio = __get_mapper_io(pr);
        char *target = xseg_get_target(peer->xseg, pr->req);
        uint32_t nr_objs = calc_nr_obj(pr->req);
        uint64_t size = sizeof(struct xseg_reply_map) + 
                        nr_objs * sizeof(struct xseg_reply_map_scatterlist);
+       uint32_t idx, i, ready;
+       uint64_t rem_size, obj_index, obj_offset, obj_size; 
+       struct map_node *mn;
+       mio->copyups = 0;
+       XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
 
-       /* resize request to fit reply */
-       char buf[XSEG_MAX_TARGETLEN];
-       strncpy(buf, target, pr->req->targetlen);
-       int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
-       if (r < 0) {
-               XSEGLOG2(&lc, E, "Cannot resize request");
+       /* get map_nodes of request */
+       struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
+       if (!mns){
+               XSEGLOG2(&lc, E, "Cannot allocate mns");
                return -1;
        }
-       target = xseg_get_target(peer->xseg, pr->req);
-       strncpy(target, buf, pr->req->targetlen);
-
-       /* structure reply */
-       struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
-       reply->cnt = nr_objs;
-
-       uint32_t idx = 0;
-       uint64_t rem_size = pr->req->size;
-       uint64_t obj_index = pr->req->offset / block_size;
-       uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo
-       uint64_t obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
-       struct map_node * mn = find_object(map, obj_index);
+       idx = 0;
+       rem_size = pr->req->size;
+       obj_index = pr->req->offset / block_size;
+       obj_offset = pr->req->offset & (block_size -1); //modulo
+       obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
+       mn = get_mapnode(map, obj_index);
        if (!mn) {
                XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
-               XSEGLOG2(&lc, E, "pr->req->offset: %llu, block_size %u\n", 
-                               (unsigned long long) pr->req->offset, 
-                               block_size);
-               goto out_err;
-       }
-       if (write && (mn->flags & MF_OBJECT_NOT_READY)) 
-               goto out_object_copying;
-       if (write && !(mn->flags & MF_OBJECT_EXIST)) {
-               //calc new_target, copy up object
-               r = copyup_object(peer, mn, pr);
-               if (r < 0) {
-                       XSEGLOG2(&lc, E, "Error in copy up object");
-                       goto out_err_copy;
-               }
-               mn->flags |= MF_OBJECT_COPYING;
-               goto out_object_copying;
+               r = -1;
+               goto out;
        }
-
-       strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
-       reply->segs[idx].targetlen = mn->objectlen;
-       reply->segs[idx].target[mn->objectlen] = 0;
-       reply->segs[idx].offset = obj_offset;
-       reply->segs[idx].size = obj_size;
+       mns[idx].mn = mn;
+       mns[idx].offset = obj_offset;
+       mns[idx].size = obj_size;
        rem_size -= obj_size;
        while (rem_size > 0) {
                idx++;
@@ -1062,651 +1304,623 @@ static int req2objs(struct peerd *peer, struct peer_req *pr,
                obj_offset = 0;
                obj_size = (rem_size >  block_size) ? block_size : rem_size;
                rem_size -= obj_size;
-               mn = find_object(map, obj_index);
+               mn = get_mapnode(map, obj_index);
                if (!mn) {
                        XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
-                       goto out_err;
+                       r = -1;
+                       goto out;
                }
-               if (write && (mn->flags & MF_OBJECT_NOT_READY)) 
-                       goto out_object_copying;
-               if (write && !(mn->flags & MF_OBJECT_EXIST)) {
-                       //calc new_target, copy up object
-                       r = copyup_object(peer, mn, pr);
-                       if (r < 0) {
-                               XSEGLOG2(&lc, E, "Error in copy up object");
-                               goto out_err_copy;
+               mns[idx].mn = mn;
+               mns[idx].offset = obj_offset;
+               mns[idx].size = obj_size;
+       }
+       if (write) {
+               ready = 0;
+               int can_wait = 0;
+               mio->cb=copyup_cb;
+               while (ready < (idx + 1)){
+                       ready = 0;
+                       for (i = 0; i < (idx+1); i++) {
+                               mn = mns[i].mn;
+                               //do copyups
+                               if (mn->flags & MF_OBJECT_NOT_READY) {
+                                       if (can_wait){
+                                               wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
+                                               if (mn->flags & MF_OBJECT_DELETED){
+                                                       mio->err = 1;
+                                               }
+                                               if (mio->err){
+                                                       XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
+                                                       if (!mio->copyups){
+                                                               r = -1;
+                                                               goto out;
+                                                       }
+                                               }
+                                       }
+                               }
+                               else if (!(mn->flags & MF_OBJECT_EXIST)) {
+                                       //calc new_target, copy up object
+                                       if (copyup_object(peer, mn, pr) == NULL){
+                                               XSEGLOG2(&lc, E, "Error in copy up object");
+                                       } else {
+                                               mio->copyups++;
+                                       }
+                               } else {
+                                       ready++;
+                               }
                        }
-                       mn->flags |= MF_OBJECT_COPYING;
-                       goto out_object_copying;
+                       can_wait = 1;
+               }
+               /*
+pending_copyups:
+               while(mio->copyups > 0){
+                       mio->cb = copyup_cb;
+                       wait_on_pr(pr, 0);
+                       ta--;
+                       st_cond_wait(pr->cond);
                }
-               strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
-               reply->segs[idx].targetlen = mn->objectlen;
-               reply->segs[idx].target[mn->objectlen] = 0;
-               reply->segs[idx].offset = obj_offset;
-               reply->segs[idx].size = obj_size;
+               */
        }
 
-       return 0;
-
-out_object_copying:
-       //printf("r2o mn: %lx\n", mn);
-       //printf("volume %s pending on %s\n", map->volume, mn->object);
-       //assert !write
-       if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx)
-               XSEGLOG2(&lc, E, "Cannot append pr to tail");
-       XSEGLOG2(&lc, I, "object %s is pending \n\t idx:%llu of map %s",
-                       mn->object, (unsigned long long) mn->objectidx, map->volume);
-       return MF_PENDING;
-
-out_err_copy:
-out_err:
-       return -1;
-}
+       if (mio->err){
+               r = -1;
+               XSEGLOG2(&lc, E, "Mio->err");
+               goto out;
+       }
 
-static int handle_mapr(struct peerd *peer, struct peer_req *pr, 
-                               struct xseg_request *req)
-{
-       struct mapperd *mapper = __get_mapperd(peer);
-       struct mapper_io *mio = __get_mapper_io(pr);
-       (void)mapper;
-       (void)mio;
-       //get_map
-       char *target = xseg_get_target(peer->xseg, pr->req);
-       struct map *map;
-       int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
+       /* resize request to fit reply */
+       char buf[XSEG_MAX_TARGETLEN];
+       strncpy(buf, target, pr->req->targetlen);
+       r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
        if (r < 0) {
-               fail(peer, pr);
-               return -1;
+               XSEGLOG2(&lc, E, "Cannot resize request");
+               goto out;
        }
-       else if (r == MF_PENDING)
-               return 0;
-       
-       if (map->flags & MF_MAP_DESTROYED) {
-               fail(peer, pr);
-               return 0;
+       target = xseg_get_target(peer->xseg, pr->req);
+       strncpy(target, buf, pr->req->targetlen);
+
+       /* structure reply */
+       struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
+       reply->cnt = nr_objs;
+       for (i = 0; i < (idx+1); i++) {
+               strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
+               reply->segs[i].targetlen = mns[i].mn->objectlen;
+               reply->segs[i].offset = mns[i].offset;
+               reply->segs[i].size = mns[i].size;
        }
-       
-       //get_object
-       r = req2objs(peer, pr, map, 0);
-       if  (r < 0){
-               XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
-                               map->volume, 
-                               (unsigned long long) pr->req->offset, 
-                               (unsigned long long) (pr->req->offset + pr->req->size));
-               fail(peer, pr);
+out:
+       for (i = 0; i < idx; i++) {
+               put_mapnode(mns[i].mn);
        }
-       else if (r == 0)
-               XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
-                               map->volume, 
-                               (unsigned long long) pr->req->offset, 
-                               (unsigned long long) (pr->req->offset + pr->req->size));
-               complete(peer, pr);
-
-       return 0;
-
-
+       free(mns);
+       return r;
 }
 
-static int handle_copyup(struct peerd *peer, struct peer_req *pr,
-                               struct xseg_request *req)
+static int do_dropcache(struct peer_req *pr, struct map *map)
 {
+       struct map_node *mn;
+       struct peerd *peer = pr->peer;
        struct mapperd *mapper = __get_mapperd(peer);
-       (void) mapper;
-       struct mapper_io *mio = __get_mapper_io(pr);
-       int r = 0;
-       xqindex idx;
-       struct map_node *mn = __get_copyup_node(mio, req);
-       if (!mn)
-               goto out_err;
-
-       mn->flags &= ~MF_OBJECT_COPYING;
-       if (req->state & XS_FAILED && !(req->state & XS_SERVED)){
-               XSEGLOG2(&lc, E, "Copy up of object %s failed", mn->object);
-               goto out_fail;
-       }
-       struct map *map = mn->map;
-       if (!map){
-               XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
-               goto out_fail;
-       }
-       
-       /* construct a tmp map_node for writing purposes */
-       char *target = xseg_get_target(peer->xseg, req);
-       struct map_node newmn = *mn;
-       newmn.flags = MF_OBJECT_EXIST;
-       strncpy(newmn.object, target, req->targetlen);
-       newmn.object[req->targetlen] = 0;
-       newmn.objectlen = req->targetlen;
-       newmn.objectidx = mn->objectidx; 
-       r = object_write(peer, pr, map, &newmn);
-       if (r != MF_PENDING){
-               XSEGLOG2(&lc, E, "Object write returned error for object %s"
-                               "\n\t of map %s [%llu]",
-                               mn->object, map->volume, (unsigned long long) mn->objectidx);
-               goto out_fail;
+       uint64_t i;
+       XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
+       map->flags |= MF_MAP_DROPPING_CACHE;
+       for (i = 0; i < calc_map_obj(map); i++) {
+               mn = get_mapnode(map, i);
+               if (mn) {
+                       if (!(mn->flags & MF_OBJECT_DESTROYED)){
+                               //make sure all pending operations on all objects are completed
+                               if (mn->flags & MF_OBJECT_NOT_READY){
+                                       wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
+                               }
+                               mn->flags &= MF_OBJECT_DESTROYED;
+                       }
+                       put_mapnode(mn);
+               }
        }
-       mn->flags |= MF_OBJECT_WRITING;
-       xseg_put_request(peer->xseg, req, peer->portno);
-       XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
+       map->flags &= ~MF_MAP_DROPPING_CACHE;
+       map->flags |= MF_MAP_DESTROYED;
+       remove_map(mapper, map);
+       XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
+       put_map(map);   // put map here to destroy it (matches m->ref = 1 on map create)
        return 0;
-
-out_fail:
-       xseg_put_request(peer->xseg, req, peer->portno);
-       __set_copyup_node(mio, req, NULL);
-       while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
-               struct peer_req * preq = (struct peer_req *) idx;
-               fail(peer, preq);
-       }
-
-out_err:
-       XSEGLOG2(&lc, E, "Cannot get map node");
-       return -1;
 }
 
-static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
-                               struct xseg_request *req)
+static int do_info(struct peer_req *pr, struct map *map)
 {
-       xqindex idx;
-       struct mapperd *mapper = __get_mapperd(peer);
-       struct mapper_io *mio = __get_mapper_io(pr);
-       //assert req->op = X_WRITE;
-       char *target = xseg_get_target(peer->xseg, req);
-       (void)target;
-       (void)mapper;
-       //printf("handle object write replyi\n");
-       struct map_node *mn = __get_copyup_node(mio, req);
-       if (!mn)
-               goto out_err;
-       
-       __set_copyup_node(mio, req, NULL);
-       
-       //assert mn->flags & MF_OBJECT_WRITING
-       mn->flags &= ~MF_OBJECT_WRITING;
-       if (req->state & XS_FAILED)
-               goto out_fail;
-
-       struct map_node tmp;
-       char *data = xseg_get_data(peer->xseg, req);
-       map_to_object(&tmp, data);
-       mn->flags |= MF_OBJECT_EXIST;
-       if (mn->flags != MF_OBJECT_EXIST){
-               XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
-               return *(int *) 0;
-       }
-       //assert mn->flags & MF_OBJECT_EXIST
-       strncpy(mn->object, tmp.object, tmp.objectlen);
-       mn->object[tmp.objectlen] = 0;
-       mn->objectlen = tmp.objectlen;
-       xseg_put_request(peer->xseg, req, peer->portno);
-
-       XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
-       while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
-               struct peer_req * preq = (struct peer_req *) idx;
-               my_dispatch(peer, preq, preq->req);
-       }
+       struct peerd *peer = pr->peer;
+       struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
+       xinfo->size = map->size;
        return 0;
+}
 
-out_fail:
-       XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
-       xseg_put_request(peer->xseg, req, peer->portno);
-       while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
-               struct peer_req *preq = (struct peer_req *) idx;
-               fail(peer, preq);
-       }
-       return 0;
 
-out_err:
-       XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
-       xseg_put_request(peer->xseg, req, peer->portno);
-       return -1;
+static int do_close(struct peer_req *pr, struct map *map)
+{
+//     struct peerd *peer = pr->peer;
+//     struct xseg_request *req;
+       if (map->flags & MF_MAP_EXCLUSIVE) 
+               close_map(pr, map);
+       return do_dropcache(pr, map);
 }
 
-static int handle_mapw(struct peerd *peer, struct peer_req *pr, 
-                               struct xseg_request *req)
+static int do_destroy(struct peer_req *pr, struct map *map)
 {
-       struct mapperd *mapper = __get_mapperd(peer);
+       uint64_t i;
+       struct peerd *peer = pr->peer;
        struct mapper_io *mio = __get_mapper_io(pr);
-       (void) mapper;
-       (void) mio;
-       /* handle copy up replies separately */
-       if (req->op == X_COPY){
-               if (handle_copyup(peer, pr, req) < 0){
-                       XSEGLOG2(&lc, E, "Handle copy up returned error");
-                       fail(peer, pr);
-                       return -1;
-               } else {
-                       return 0;
-               }
-       }
-       else if(req->op == X_WRITE){
-               /* handle replies of object write operations */
-               if (handle_objectwrite(peer, pr, req) < 0) {
-                       XSEGLOG2(&lc, E, "Handle object write returned error");
-                       fail(peer, pr);
-                       return -1;
-               } else {
-                       return 0;
-               }
-       }
-
-       char *target = xseg_get_target(peer->xseg, pr->req);
-       struct map *map;
-       int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
-       if (r < 0) {
-               fail(peer, pr);
+       struct map_node *mn;
+       struct xseg_request *req;
+       
+       XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
+       map->flags |= MF_MAP_DELETING;
+       req = delete_map(pr, map);
+       if (!req)
+               return -1;
+       wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
+       if (req->state & XS_FAILED){
+               xseg_put_request(peer->xseg, req, pr->portno);
+               map->flags &= ~MF_MAP_DELETING;
                return -1;
        }
-       else if (r == MF_PENDING)
-               return 0;
-       
-       if (map->flags & MF_MAP_DESTROYED) {
-               fail(peer, pr);
-               return 0;
+       xseg_put_request(peer->xseg, req, pr->portno);
+       //FIXME
+       uint64_t nr_obj = calc_map_obj(map);
+       uint64_t deleted = 0;
+       while (deleted < nr_obj){ 
+               deleted = 0;
+               for (i = 0; i < nr_obj; i++){
+                       mn = get_mapnode(map, i);
+                       if (mn) {
+                               if (!(mn->flags & MF_OBJECT_DESTROYED)){
+                                       if (mn->flags & MF_OBJECT_EXIST){
+                                               //make sure all pending operations on all objects are completed
+                                               if (mn->flags & MF_OBJECT_NOT_READY){
+                                                       wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
+                                               }
+                                               req = delete_object(pr, mn);
+                                               if (!req)
+                                                       if (mio->del_pending){
+                                                               goto wait_pending;
+                                                       } else {
+                                                               continue;
+                                                       }
+                                               else {
+                                                       mio->del_pending++;
+                                               }
+                                       }
+                                       mn->flags &= MF_OBJECT_DESTROYED;
+                               }
+                               put_mapnode(mn);
+                       }
+                       deleted++;
+               }
+wait_pending:
+               mio->cb = deletion_cb;
+               wait_on_pr(pr, mio->del_pending > 0);
        }
+       mio->cb = NULL;
+       map->flags &= ~MF_MAP_DELETING;
+       XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
+       return do_close(pr, map);
+}
 
-       r = req2objs(peer, pr, map, 1);
-       if (r < 0){
+static int do_mapr(struct peer_req *pr, struct map *map)
+{
+       struct peerd *peer = pr->peer;
+       int r = req2objs(pr, map, 0);
+       if  (r < 0){
                XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
                                map->volume, 
                                (unsigned long long) pr->req->offset, 
                                (unsigned long long) (pr->req->offset + pr->req->size));
-               fail(peer, pr);
+               return -1;
        }
-       if (r == 0){
-               XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
-                               map->volume, 
-                               (unsigned long long) pr->req->offset, 
-                               (unsigned long long) (pr->req->offset + pr->req->size));
-               complete(peer, pr);
+       XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
+                       map->volume, 
+                       (unsigned long long) pr->req->offset, 
+                       (unsigned long long) (pr->req->offset + pr->req->size));
+       XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
+                       (unsigned long long) pr->req->offset,
+                       (unsigned long long) pr->req->size);
+       char buf[XSEG_MAX_TARGETLEN+1];
+       struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
+       int i;
+       for (i = 0; i < reply->cnt; i++) {
+               XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
+               strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
+               buf[reply->segs[i].targetlen] = 0;
+               XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
+                               (unsigned long long) reply->segs[i].offset,
+                               (unsigned long long) reply->segs[i].size);
        }
-       //else copyup pending, wait for pr restart
-
        return 0;
 }
 
-static int handle_snap(struct peerd *peer, struct peer_req *pr, 
-                               struct xseg_request *req)
+static int do_mapw(struct peer_req *pr, struct map *map)
 {
-       fail(peer, pr);
-       return 0;
-}
-
-static int handle_info(struct peerd *peer, struct peer_req *pr, 
-                               struct xseg_request *req)
-{
-       struct mapperd *mapper = __get_mapperd(peer);
-       struct mapper_io *mio = __get_mapper_io(pr);
-       (void) mapper;
-       (void) mio;
-       char *target = xseg_get_target(peer->xseg, pr->req);
-       if (!target) {
-               fail(peer, pr);
-               return 0;
-       }
-       //printf("Handle info\n");
-       struct map *map;
-       int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
-       if (r < 0) {
-               fail(peer, pr);
+       struct peerd *peer = pr->peer;
+       int r = req2objs(pr, map, 1);
+       if  (r < 0){
+               XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
+                               map->volume, 
+                               (unsigned long long) pr->req->offset, 
+                               (unsigned long long) (pr->req->offset + pr->req->size));
                return -1;
        }
-       else if (r == MF_PENDING)
-               return 0;
-       if (map->flags & MF_MAP_DESTROYED) {
-               fail(peer, pr);
-               return 0;
+       XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
+                       map->volume, 
+                       (unsigned long long) pr->req->offset, 
+                       (unsigned long long) (pr->req->offset + pr->req->size));
+       XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
+                       (unsigned long long) pr->req->offset,
+                       (unsigned long long) pr->req->size);
+       char buf[XSEG_MAX_TARGETLEN+1];
+       struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
+       int i;
+       for (i = 0; i < reply->cnt; i++) {
+               XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
+               strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
+               buf[reply->segs[i].targetlen] = 0;
+               XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
+                               (unsigned long long) reply->segs[i].offset,
+                               (unsigned long long) reply->segs[i].size);
        }
-       
-       struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
-       xinfo->size = map->size;
-       complete(peer, pr);
-
        return 0;
 }
 
-static int delete_object(struct peerd *peer, struct peer_req *pr,
-                               struct map_node *mn)
+//here map is the parent map
+static int do_clone(struct peer_req *pr, struct map *map)
 {
-       void *dummy;
-       struct mapperd *mapper = __get_mapperd(peer);
-       struct mapper_io *mio = __get_mapper_io(pr);
+       /*
+       FIXME check if clone map exists
+       clonemap = get_map(pr, target, targetlen, MF_LOAD);
+       if (clonemap)
+               do_dropcache(pr, clonemap); // drop map here, rely on get_map_function to drop
+                                       //      cache on non-exclusive opens or declare a NO_CACHE flag ?
+               return -1;
+       */
 
-       if (!(mn->flags && MF_OBJECT_EXIST)){
-               //cant delete not existing object
+       int r;
+       char buf[XSEG_MAX_TARGETLEN];
+       struct peerd *peer = pr->peer;
+       struct mapperd *mapper = __get_mapperd(peer);
+       char *target = xseg_get_target(peer->xseg, pr->req);
+       struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
+       XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
+       struct map *clonemap = create_map(mapper, target, pr->req->targetlen);
+       if (!clonemap) 
+               return -1;
 
+       if (xclone->size == -1)
+               clonemap->size = map->size;
+       else
+               clonemap->size = xclone->size;
+       if (clonemap->size < map->size){
+               target = xseg_get_target(peer->xseg, pr->req);
+               strncpy(buf, target, pr->req->targetlen);
+               buf[pr->req->targetlen] = 0;
+               XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
+                               "\n\t for requested clone %s",
+                               (unsigned long long) xclone->size,
+                               (unsigned long long) map->size, buf);
+               goto out_err;
+       }
+       
+       //alloc and init map_nodes
+       unsigned long c = clonemap->size/block_size + 1;
+       struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
+       if (!map_nodes){
+               goto out_err;
        }
-       if (xq_count(&mn->pending) != 0) {
-               mio->delobj = mn->objectidx;
-               __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
-               XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
-                               mn->object);
-               return MF_PENDING;
+       int i;
+       for (i = 0; i < clonemap->size/block_size + 1; i++) {
+               struct map_node *mn = get_mapnode(map, i);
+               if (mn) {
+                       strncpy(map_nodes[i].object, mn->object, mn->objectlen);
+                       map_nodes[i].objectlen = mn->objectlen;
+                       put_mapnode(mn);
+               } else {
+                       strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
+                       map_nodes[i].objectlen = strlen(zero_block);
+               }
+               map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
+               map_nodes[i].flags = 0;
+               map_nodes[i].objectidx = i;
+               map_nodes[i].map = clonemap;
+               map_nodes[i].ref = 1;
+               map_nodes[i].waiters = 0;
+               map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
+               r = insert_object(clonemap, &map_nodes[i]);
+               if (r < 0){
+                       XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
+                       goto out_err;
+               }
        }
-
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
-                                                       mapper->bportno, X_ALLOC);
-       if (!req)
+       r = write_map(pr, clonemap);
+       if (r < 0){
+               XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
                goto out_err;
-       int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
-       if (r < 0)
-               goto out_put;
-       char *target = xseg_get_target(peer->xseg, req);
-       strncpy(target, mn->object, req->targetlen);
-       req->op = X_DELETE;
-       req->size = req->datalen;
-       req->offset = 0;
-
-       r = xseg_set_req_data(peer->xseg, req, pr);
-       if (r < 0)
-               goto out_put;
-       __set_copyup_node(mio, req, mn);
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
-       if (p == NoPort)
-               goto out_unset;
-       r = xseg_signal(peer->xseg, p);
-       mn->flags |= MF_OBJECT_DELETING;
-       XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
-       return MF_PENDING;
+       }
+       return 0;
 
-out_unset:
-       xseg_get_req_data(peer->xseg, req, &dummy);
-out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
 out_err:
-       XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
+       put_map(clonemap);
        return -1;
 }
-static int handle_object_delete(struct peerd *peer, struct peer_req *pr, 
-                                struct map_node *mn, int err)
+
+static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
 {
-       struct mapperd *mapper = __get_mapperd(peer);
-       struct mapper_io *mio = __get_mapper_io(pr);
-       uint64_t idx;
-       struct map *map = mn->map;
-       int r;
-       (void) mio;
-       //if object deletion failed, map deletion must continue
-       //and report OK, since map block has been deleted succesfully
-       //so, no check for err
-
-       //assert object flags OK
-       //free map_node_resources
-       map->flags &= ~MF_OBJECT_DELETING;
-       xq_free(&mn->pending);
-       //find next object
-       idx = mn->objectidx;
-       //remove_object(map, idx);
-       idx++;
-       mn = find_object(map, idx);
-       while (!mn && idx < calc_map_obj(map)) {
-               idx++;
-               mn = find_object(map, idx);
-       }
-       if (mn) {
-               //delete next object or complete;
-               r = delete_object(peer, pr, mn);
+       int r, opened = 0;
+retry_open:
+       if (flags & MF_EXCLUSIVE){
+               r = open_map(pr, map);
                if (r < 0) {
-                       XSEGLOG2(&lc, E, "Object %s delete object return error"
-                                        "\n\t Map: %s [%llu]", 
-                                        mn->object, mn->map->volume, 
-                                        (unsigned long long) mn->objectidx);
-                       goto del_completed;
-               }
-               XSEGLOG2(&lc, I, "Handle object delete OK");
-       } else {
-del_completed:
-               //assert map flags OK
-               map->flags &= ~MF_MAP_DELETING;
-               map->flags |= MF_MAP_DESTROYED;
-               XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
-               //make all pending requests on map to fail
-               while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
-                       struct peer_req * preq = (struct peer_req *) idx;
-                       my_dispatch(peer, preq, preq->req);
+                       if (flags & MF_FORCE){
+
+                               goto retry_open;
+                       }
+               } else {
+                       opened = 1;
                }
-               //free map resources;
-               remove_map(mapper, map);
-               mn = find_object(map, 0);
-               free(mn);
-               xq_free(&map->pending);
-               free(map);
        }
-       return 0;
+       r = load_map(pr, map);
+       if (r < 0 && opened){
+               close_map(pr, map);
+       }
+       return r;
 }
 
-static int delete_map(struct peerd *peer, struct peer_req *pr,
-                       struct map *map)
+struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
 {
-       void *dummy;
+       int r;
+       struct peerd *peer = pr->peer;
        struct mapperd *mapper = __get_mapperd(peer);
-       struct mapper_io *mio = __get_mapper_io(pr);
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
-                                                       mapper->mbportno, X_ALLOC);
-       if (!req)
-               goto out_err;
-       int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
-       if (r < 0)
-               goto out_put;
-       char *target = xseg_get_target(peer->xseg, req);
-       strncpy(target, map->volume, req->targetlen);
-       req->op = X_DELETE;
-       req->size = req->datalen;
-       req->offset = 0;
+       struct map *map = find_map(mapper, name, namelen);
+       if (!map){
+               if (flags & MF_LOAD){
+                       map = create_map(mapper, name, namelen);
+                       if (!map)
+                               return NULL;
+                       r = open_load_map(pr, map, flags);
+                       if (r < 0){
+                               do_dropcache(pr, map);
+                               return NULL;
+                       }
+               } else {
+                       return NULL;
+               }
+       } else if (map->flags & MF_MAP_DESTROYED){
+               return NULL;
+       }
+       __get_map(map);
+       return map;
 
-       r = xseg_set_req_data(peer->xseg, req, pr);
-       if (r < 0)
-               goto out_put;
-       __set_copyup_node(mio, req, NULL);
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
-       if (p == NoPort)
-               goto out_unset;
-       r = xseg_signal(peer->xseg, p);
-       map->flags |= MF_MAP_DELETING;
-       XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
-       return MF_PENDING;
+}
 
-out_unset:
-       xseg_get_req_data(peer->xseg, req, &dummy);
-out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
-out_err:
-       XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
-       return -1;
+static int map_action(int (action)(struct peer_req *pr, struct map *map),
+               struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
+{
+       //struct peerd *peer = pr->peer;
+       struct map *map;
+start:
+       map = get_map(pr, name, namelen, flags);
+       if (!map)
+               return -1;
+       if (map->flags & MF_MAP_NOT_READY){
+               wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
+               put_map(map);
+               goto start;
+       }
+       int r = action(pr, map);
+       //always drop cache if map not read exclusively
+       if (!(map->flags & MF_MAP_EXCLUSIVE))
+               do_dropcache(pr, map);
+       //maybe capture ref before and compare here?
+       if (map->ref > 1){
+               signal_map(map);
+       }
+       put_map(map);
+       return r;
 }
 
-static int handle_map_delete(struct peerd *peer, struct peer_req *pr, 
-                               struct map *map, int err)
+void * handle_info(struct peer_req *pr)
+{
+       struct peerd *peer = pr->peer;
+       char *target = xseg_get_target(peer->xseg, pr->req);
+       int r = map_action(do_info, pr, target, pr->req->targetlen, MF_LOAD);
+       if (r < 0)
+               fail(peer, pr);
+       else
+               complete(peer, pr);
+       ta--;
+       return NULL;
+}
+
+void * handle_clone(struct peer_req *pr)
 {
-       struct mapperd *mapper = __get_mapperd(peer);
-       struct mapper_io *mio = __get_mapper_io(pr);
-       xqindex idx;
        int r;
-       (void) mio;
-       map->flags &= ~MF_MAP_DELETING;
-       if (err) {
-               XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
-               //dispatch all pending
-               while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
-                       struct peer_req * preq = (struct peer_req *) idx;
-                       my_dispatch(peer, preq, preq->req);
-               }
+       struct peerd *peer = pr->peer;
+       struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
+       if (!xclone) {
+               r = -1;
+               goto out;
+       }
+       if (xclone->targetlen){
+               r = map_action(do_clone, pr, xclone->target, xclone->targetlen, MF_LOAD);
        } else {
-               map->flags |= MF_MAP_DESTROYED;
-               //delete all objects
-               XSEGLOG2(&lc, E, "Map %s map block deleted. Deleting objects", map->volume);
-               struct map_node *mn = find_object(map, 0);
-               if (!mn) {
-                       XSEGLOG2(&lc, E, "Map %s has no object 0", map->volume);
-                       //this should never happen
-                       //make all pending requests on map to fail
-                       while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
-                               struct peer_req * preq = (struct peer_req *) idx;
-                               my_dispatch(peer, preq, preq->req);
+               if (!xclone->size){
+                       r = -1;
+               } else {
+                       struct map *map;
+                       char *target = xseg_get_target(peer->xseg, pr->req);
+                       XSEGLOG2(&lc, I, "Creating volume");
+                       map = get_map(pr, target, pr->req->targetlen, MF_LOAD);
+                       if (map){
+                               XSEGLOG2(&lc, E, "Volume %s exists", map->volume);
+                               if (map->ref <= 2) //initial one + one ref from __get_map
+                                       do_dropcache(pr, map); //we are the only ones usining this map. Drop the cache. 
+                               put_map(map); //matches get_map
+                               r = -1;
+                               goto out;
                        }
-                       //free map resources;
-                       remove_map(mapper, map);
-                       xq_free(&map->pending);
-                       free(map);
-                       return 0;
-               }
-               r = delete_object(peer, pr, mn);
-               if (r < 0) {
-                       XSEGLOG2(&lc, E, "Deleting first object of map %s returned error"
-                                       "\n\t Dispatching pending requests",
-                                       map->volume);
-                       //dispatch all pending
-                       while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
-                               struct peer_req * preq = (struct peer_req *) idx;
-                               my_dispatch(peer, preq, preq->req);
+                       //create a new empty map of size
+                       map = create_map(mapper, target, pr->req->targetlen);
+                       if (!map){
+                               r = -1;
+                               goto out;
+                       }
+                       map->size = xclone->size;
+                       //populate_map with zero objects;
+                       uint64_t nr_objs = xclone->size / block_size;
+                       if (xclone->size % block_size)
+                               nr_objs++;
+                               
+                       struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
+                       if (!map_nodes){
+                               do_dropcache(pr, map); //Since we just created the map, dropping cache should be sufficient.
+                               r = -1;
+                               goto out;
+                       }
+                       uint64_t i;
+                       for (i = 0; i < nr_objs; i++) {
+                               strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
+                               map_nodes[i].objectlen = strlen(zero_block);
+                               map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
+                               map_nodes[i].flags = 0;
+                               map_nodes[i].objectidx = i;
+                               map_nodes[i].map = map;
+                               map_nodes[i].ref = 1;
+                               map_nodes[i].waiters = 0;
+                               map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
+                               r = insert_object(map, &map_nodes[i]);
+                               if (r < 0){
+                                       do_dropcache(pr, map);
+                                       r = -1;
+                                       goto out;
+                               }
                        }
+                       r = write_map(pr, map);
+                       if (r < 0){
+                               XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
+                               do_dropcache(pr, map);
+                               goto out;
+                       }
+                       XSEGLOG2(&lc, I, "Volume %s created", map->volume);
+                       r = 0;
+                       do_dropcache(pr, map); //drop cache here for consistency
                }
        }
-       return 0;
+out:
+       if (r < 0)
+               fail(peer, pr);
+       else
+               complete(peer, pr);
+       ta--;
+       return NULL;
 }
 
-static int handle_delete(struct peerd *peer, struct peer_req *pr, 
-                               struct xseg_request *req)
+void * handle_mapr(struct peer_req *pr)
 {
-       struct mapperd *mapper = __get_mapperd(peer);
-       struct mapper_io *mio = __get_mapper_io(pr);
-       struct map_node *mn;
-       struct map *map;
-       int err = 0;
-       if (req->state & XS_FAILED && !(req->state &XS_SERVED)) 
-               err = 1;
-       
-       mn = __get_copyup_node(mio, req);
-       __set_copyup_node(mio, req, NULL);
-       char *target = xseg_get_target(peer->xseg, req);
-       if (!mn) {
-               //map block delete
-               map = find_map(mapper, target, req->targetlen);
-               if (!map) {
-                       xseg_put_request(peer->xseg, req, peer->portno);
-                       return -1;
-               }
-               handle_map_delete(peer, pr, map, err);
-       } else {
-               //object delete
-               map = mn->map;
-               if (!map) {
-                       xseg_put_request(peer->xseg, req, peer->portno);
-                       return -1;
-               }
-               handle_object_delete(peer, pr, mn, err);
-       }
-       xseg_put_request(peer->xseg, req, peer->portno);
-       return 0;
+       struct peerd *peer = pr->peer;
+       char *target = xseg_get_target(peer->xseg, pr->req);
+       int r = map_action(do_mapr, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE);
+       if (r < 0)
+               fail(peer, pr);
+       else
+               complete(peer, pr);
+       ta--;
+       return NULL;
 }
 
-static int handle_destroy(struct peerd *peer, struct peer_req *pr, 
-                               struct xseg_request *req)
+void * handle_mapw(struct peer_req *pr)
 {
-       struct mapperd *mapper = __get_mapperd(peer);
-       struct mapper_io *mio = __get_mapper_io(pr);
-       (void) mapper;
-       int r;
-
-       if (pr->req != req && req->op == X_DELETE) {
-               //assert mio->state == DELETING
-               r = handle_delete(peer, pr, req);
-               if (r < 0) {
-                       XSEGLOG2(&lc, E, "Handle delete returned error");
-                       fail(peer, pr);
-                       return -1;
-               } else {
-                       return 0;
-               }
-       }
+       struct peerd *peer = pr->peer;
+       char *target = xseg_get_target(peer->xseg, pr->req);
+       int r = map_action(do_mapw, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
+       if (r < 0)
+               fail(peer, pr);
+       else
+               complete(peer, pr);
+       XSEGLOG2(&lc, D, "Ta: %d", ta);
+       ta--;
+       return NULL;
+}
 
-       struct map *map;
+void * handle_destroy(struct peer_req *pr)
+{
+       struct peerd *peer = pr->peer;
        char *target = xseg_get_target(peer->xseg, pr->req);
-       r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
-       if (r < 0) {
+       int r = map_action(do_destroy, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
+       if (r < 0)
                fail(peer, pr);
-               return -1;
-       }
-       else if (r == MF_PENDING)
-               return 0;
-       if (map->flags & MF_MAP_DESTROYED) {
-               if (mio->state == DELETING){
-                       XSEGLOG2(&lc, I, "Map %s destroyed", map->volume);
-                       complete(peer, pr);
-               }
-               else{
-                       XSEGLOG2(&lc, I, "Map %s already destroyed", map->volume);
-                       fprintf(stderr, "map destroyed\n");
-                       fail(peer, pr);
-               }
-               return 0;
-       }
-       if (mio->state == DELETING) {
-               //continue deleting map objects;
-               struct map_node *mn = find_object(map, mio->delobj);
-               if (!mn) {
-                       complete(peer, pr);
-                       return 0;
-               }
-               r = delete_object(peer, pr, mn);
-               if (r < 0) {
-                       complete(peer, pr);
-               }
-               return 0;
-       }
-       //delete map block
-       r = delete_map(peer, pr, map);
-       if (r < 0) {
-               XSEGLOG2(&lc, E, "Map delete for map %s returned error", map->volume);
+       else
+               complete(peer, pr);
+       ta--;
+       return NULL;
+}
+
+void * handle_close(struct peer_req *pr)
+{
+       struct peerd *peer = pr->peer;
+       char *target = xseg_get_target(peer->xseg, pr->req);
+       //here we do not want to load
+       int r = map_action(do_close, pr, target, pr->req->targetlen, MF_EXCLUSIVE|MF_FORCE);
+       if (r < 0)
                fail(peer, pr);
-               return -1;
-       } else if (r == MF_PENDING) {
-               XSEGLOG2(&lc, I, "Map %s delete pending", map->volume);
-               __xq_append_tail(&map->pending, (xqindex) pr);
-               mio->state = DELETING;
-               return 0;
-       }
-       //unreachable
-       XSEGLOG2(&lc, E, "Destroy unreachable");
-       fail(peer, pr);
-       return 0;
+       else
+               complete(peer, pr);
+       ta--;
+       return NULL;
 }
 
-static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
+int dispatch_accepted(struct peerd *peer, struct peer_req *pr, 
+                       struct xseg_request *req)
 {
-       struct mapperd *mapper = __get_mapperd(peer);
-       (void) mapper;
+       //struct mapperd *mapper = __get_mapperd(peer);
        struct mapper_io *mio = __get_mapper_io(pr);
-       (void) mio;
-
-       if (req->op == X_READ) {
-               /* catch map reads requests here */
-               handle_mapread(peer, pr, req);
-               return 0;
-       }
+       void *(*action)(struct peer_req *) = NULL;
 
+       mio->state = ACCEPTED;
+       mio->err = 0;
+       mio->cb = NULL;
        switch (pr->req->op) {
                /* primary xseg operations of mapper */
-               case X_CLONE: handle_clone(peer, pr, req); break;
-               case X_MAPR: handle_mapr(peer, pr, req); break;
-               case X_MAPW: handle_mapw(peer, pr, req); break;
+               case X_CLONE: action = handle_clone; break;
+               case X_MAPR: action = handle_mapr; break;
+               case X_MAPW: action = handle_mapw; break;
 //             case X_SNAPSHOT: handle_snap(peer, pr, req); break;
-               case X_INFO: handle_info(peer, pr, req); break;
-               case X_DELETE: handle_destroy(peer, pr, req); break;
+               case X_INFO: action = handle_info; break;
+               case X_DELETE: action = handle_destroy; break;
+               case X_CLOSE: action = handle_close; break;
                default: fprintf(stderr, "mydispatch: unknown up\n"); break;
        }
+       if (action){
+               ta++;
+               mio->active = 1;
+               st_thread_create(action, pr, 0, 0);
+       }
        return 0;
+
 }
 
-int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
+int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
+               enum dispatch_reason reason)
 {
        struct mapperd *mapper = __get_mapperd(peer);
        (void) mapper;
        struct mapper_io *mio = __get_mapper_io(pr);
        (void) mio;
 
-       if (pr->req == req)
-               mio->state = ACCEPTED;
-       my_dispatch(peer, pr ,req);
+
+       if (reason == dispatch_accept)
+               dispatch_accepted(peer, pr, req);
+       else {
+               if (mio->cb){
+                       mio->cb(pr, req);
+               } else { 
+                       signal_pr(pr);
+               }
+       }
        return 0;
 }
 
@@ -1714,7 +1928,7 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
 {
        int i;
        unsigned char buf[SHA256_DIGEST_SIZE];
-       char *zero;
+       unsigned char *zero;
 
        gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
 
@@ -1742,15 +1956,17 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
        free(zero);
 
        //FIXME error checks
-       struct mapperd *mapper = malloc(sizeof(struct mapperd));
+       struct mapperd *mapperd = malloc(sizeof(struct mapperd));
+       peer->priv = mapperd;
+       mapper = mapperd;
        mapper->hashmaps = xhash_new(3, STRING);
-       peer->priv = mapper;
        
        for (i = 0; i < peer->nr_ops; i++) {
                struct mapper_io *mio = malloc(sizeof(struct mapper_io));
                mio->copyups_nodes = xhash_new(3, INTEGER);
                mio->copyups = 0;
                mio->err = 0;
+               mio->active = 0;
                peer->peer_reqs[i].priv = mio;
        }
 
@@ -1777,6 +1993,10 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                }
        }
 
+       const struct sched_param param = { .sched_priority = 99 };
+       sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
+
+
 //     test_map(peer);
 
        return 0;