added speer skeletor
[archipelago] / xseg / peers / user / mt-mapperd.c
index 815c971..289f8cd 100644 (file)
@@ -12,6 +12,8 @@
 #include <fcntl.h>
 #include <gcrypt.h>
 #include <errno.h>
+#include <sched.h>
+#include <sys/syscall.h>
 
 GCRY_THREAD_OPTION_PTHREAD_IMPL;
 
@@ -42,7 +44,8 @@ enum mapper_state {
        ACCEPTED = 0,
        WRITING = 1,
        COPYING = 2,
-       DELETING = 3
+       DELETING = 3,
+       DROPPING_CACHE = 4
 };
 
 struct map_node {
@@ -58,8 +61,10 @@ struct map_node {
 #define MF_MAP_DESTROYED       (1 << 1)
 #define MF_MAP_WRITING         (1 << 2)
 #define MF_MAP_DELETING                (1 << 3)
+#define MF_MAP_DROPPING_CACHE  (1 << 4)
 
-#define MF_MAP_NOT_READY       (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING)
+#define MF_MAP_NOT_READY       (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
+                                       MF_MAP_DROPPING_CACHE)
 
 struct map {
        uint32_t flags;
@@ -82,6 +87,7 @@ struct mapper_io {
        struct map_node *copyup_node;
        int err;                        /* error flag */
        uint64_t delobj;
+       uint64_t dcobj;
        enum mapper_state state;
 };
 
@@ -138,7 +144,7 @@ static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targ
        //assert targetlen <= XSEG_MAX_TARGETLEN
        strncpy(buf, target, targetlen);
        buf[targetlen] = 0;
-       XSEGLOG2(&lc, E, "looking up map %s, len %u", buf, targetlen);
+       XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
        r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
        if (r < 0)
                return NULL;
@@ -155,10 +161,11 @@ static int insert_map(struct mapperd *mapper, struct map *map)
                goto out;
        }
 
-       XSEGLOG2(&lc, E, "Inserting map %s, len: %d", map->volume, strlen(map->volume));
+       XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
+                       map->volume, strlen(map->volume), (unsigned long) map);
        r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
-       if (r == -XHASH_ERESIZE) {
-               xhashidx shift = xhash_grow_size_shift(map->objects);
+       while (r == -XHASH_ERESIZE) {
+               xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
                if (!new_hashmap){
                        XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
@@ -179,8 +186,8 @@ static int remove_map(struct mapperd *mapper, struct map *map)
        //assert no pending pr on map
        
        r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
-       if (r == -XHASH_ERESIZE) {
-               xhashidx shift = xhash_shrink_size_shift(map->objects);
+       while (r == -XHASH_ERESIZE) {
+               xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
                if (!new_hashmap){
                        XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
@@ -195,14 +202,14 @@ out:
 }
 
 /* async map load */
-static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint32_t targetlen)
+static int load_map(struct peerd *peer, struct peer_req *pr, char *target, 
+                       uint32_t targetlen)
 {
        int r;
        xport p;
        struct xseg_request *req;
        struct mapperd *mapper = __get_mapperd(peer);
        void *dummy;
-       //printf("Loading map\n");
 
        struct map *m = find_map(mapper, target, targetlen);
        if (!m) {
@@ -237,9 +244,8 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint3
        if (r < 0)  
                goto out_hash;
        
-       //printf("Loading map: preparing req\n");
 
-       req = xseg_get_request(peer->xseg, peer->portno, mapper->mbportno, X_ALLOC);
+       req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
        if (!req){
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
                                m->volume);
@@ -256,7 +262,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint3
        char *reqtarget = xseg_get_target(peer->xseg, req);
        if (!reqtarget)
                goto out_put;
-       strncpy(reqtarget, target, targetlen);
+       strncpy(reqtarget, target, req->targetlen);
        req->op = X_READ;
        req->size = block_size;
        req->offset = 0;
@@ -266,7 +272,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint3
                                m->volume);
                goto out_put;
        }
-       p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){ 
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
                                m->volume);
@@ -280,7 +286,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint3
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 
 out_fail:
        remove_map(mapper, m);
@@ -321,6 +327,7 @@ static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
        int r;
        *m = find_map(mapper, target, targetlen);
        if (*m) {
+               XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
                if ((*m)->flags & MF_MAP_NOT_READY) {
                        __xq_append_tail(&(*m)->pending, (xqindex) pr);
                        XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
@@ -416,7 +423,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
 {
        void *dummy;
        struct mapperd *mapper = __get_mapperd(peer);
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
                                                        mapper->mbportno, X_ALLOC);
        if (!req){
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
@@ -432,7 +439,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
                goto out_put;
        }
        char *target = xseg_get_target(peer->xseg, req);
-       strncpy(target, map->volume, map->volumelen);
+       strncpy(target, map->volume, req->targetlen);
        req->size = objectsize_in_map;
        req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
        req->op = X_WRITE;
@@ -446,7 +453,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
                goto out_put;
        }
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){
                XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
                                "(Map: %s [%llu]",
@@ -466,7 +473,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
                        "(Map: %s [%llu]",
@@ -480,7 +487,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
        struct mapperd *mapper = __get_mapperd(peer);
        struct map_node *mn;
        uint64_t i, pos, max_objidx = calc_map_obj(map);
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->mbportno, X_ALLOC);
        if (!req){
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
@@ -519,7 +526,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
                                map->volume);
                goto out_put;
        }
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
                                map->volume);
@@ -536,7 +543,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
        return -1;
@@ -674,8 +681,10 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re
                sprintf (new_target + 2*i, "%02x", buf[i]);
        newtargetlen = SHA256_DIGEST_SIZE  * 2;
 
+       if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE)) 
+               goto copyup_zeroblock;
 
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->bportno, X_ALLOC);
        if (!req)
                goto out_err;
@@ -685,7 +694,7 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re
                goto out_put;
 
        char *target = xseg_get_target(peer->xseg, req);
-       strncpy(target, new_target, newtargetlen);
+       strncpy(target, new_target, req->targetlen);
 
        struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
        strncpy(xcopy->target, mn->object, mn->objectlen);
@@ -698,24 +707,47 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re
        if (r<0)
                goto out_put;
        r = __set_copyup_node(mio, req, mn);
-       p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort) {
                goto out_unset;
        }
        xseg_signal(peer->xseg, p);
        mio->copyups++;
 
+       mn->flags |= MF_OBJECT_COPYING;
        XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
        return 0;
 
 out_unset:
+       r = __set_copyup_node(mio, req, NULL);
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
        return -1;
 
+copyup_zeroblock:
+       XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
+                       "Proceeding in writing the new object in map");
+       /* construct a tmp map_node for writing purposes */
+       struct map_node newmn = *mn;
+       newmn.flags = MF_OBJECT_EXIST;
+       strncpy(newmn.object, new_target, newtargetlen);
+       newmn.object[newtargetlen] = 0;
+       newmn.objectlen = newtargetlen;
+       newmn.objectidx = mn->objectidx; 
+       r = __set_copyup_node(mio, req, mn);
+       r = object_write(peer, pr, map, &newmn);
+       if (r != MF_PENDING){
+               XSEGLOG2(&lc, E, "Object write returned error for object %s"
+                               "\n\t of map %s [%llu]",
+                               mn->object, map->volume, (unsigned long long) mn->objectidx);
+               return -1;
+       }
+       mn->flags |= MF_OBJECT_WRITING;
+       XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
+       return 0;
 }
 
 /*
@@ -744,11 +776,12 @@ static int handle_mapread(struct peerd *peer, struct peer_req *pr,
        if (r < 0)
                goto out_fail;
        
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        map->flags &= ~MF_MAP_LOADING;
-       print_map(map);
        XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
-       while((idx = __xq_pop_head(&map->pending)) != Noneidx){
+       uint64_t qsize = xq_count(&map->pending);
+       while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+               qsize--;
                struct peer_req *preq = (struct peer_req *) idx;
                my_dispatch(peer, preq, preq->req);
        }
@@ -756,7 +789,7 @@ static int handle_mapread(struct peerd *peer, struct peer_req *pr,
 
 out_fail:
        XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        map->flags &= ~MF_MAP_LOADING;
        while((idx = __xq_pop_head(&map->pending)) != Noneidx){
                struct peer_req *preq = (struct peer_req *) idx;
@@ -771,7 +804,7 @@ out_err:
        strncpy(buf, target, req->targetlen);
        buf[req->targetlen] = 0;
        XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        return -1;
 }
 
@@ -795,10 +828,12 @@ static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
                goto out_fail;
        }
        
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        map->flags &= ~MF_MAP_WRITING;
        XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
-       while((idx = __xq_pop_head(&map->pending)) != Noneidx){
+       uint64_t qsize = xq_count(&map->pending);
+       while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+               qsize--;
                struct peer_req *preq = (struct peer_req *) idx;
                my_dispatch(peer, preq, preq->req);
        }
@@ -807,7 +842,7 @@ static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
 
 out_fail:
        XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        map->flags &= ~MF_MAP_WRITING;
        while((idx = __xq_pop_head(&map->pending)) != Noneidx){
                struct peer_req *preq = (struct peer_req *) idx;
@@ -822,7 +857,7 @@ out_err:
        strncpy(buf, target, req->targetlen);
        buf[req->targetlen] = 0;
        XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        return -1;
 }
 
@@ -958,7 +993,6 @@ static int handle_clone(struct peerd *peer, struct peer_req *pr,
                        goto out_free_all;
                }
        }
-       print_map(clonemap);
        //insert map
        r = insert_map(mapper, clonemap);
        if ( r < 0) {
@@ -1047,7 +1081,6 @@ static int req2objs(struct peerd *peer, struct peer_req *pr,
                        XSEGLOG2(&lc, E, "Error in copy up object");
                        goto out_err_copy;
                }
-               mn->flags |= MF_OBJECT_COPYING;
                goto out_object_copying;
        }
 
@@ -1057,7 +1090,6 @@ static int req2objs(struct peerd *peer, struct peer_req *pr,
 //                             block_size);
        strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
        reply->segs[idx].targetlen = mn->objectlen;
-       reply->segs[idx].target[mn->objectlen] = 0;
        reply->segs[idx].offset = obj_offset;
        reply->segs[idx].size = obj_size;
 //     XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
@@ -1084,12 +1116,10 @@ static int req2objs(struct peerd *peer, struct peer_req *pr,
                                XSEGLOG2(&lc, E, "Error in copy up object");
                                goto out_err_copy;
                        }
-                       mn->flags |= MF_OBJECT_COPYING;
                        goto out_object_copying;
                }
                strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
                reply->segs[idx].targetlen = mn->objectlen;
-               reply->segs[idx].target[mn->objectlen] = 0;
                reply->segs[idx].offset = obj_offset;
                reply->segs[idx].size = obj_size;
 //             XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
@@ -1215,12 +1245,12 @@ static int handle_copyup(struct peerd *peer, struct peer_req *pr,
                goto out_fail;
        }
        mn->flags |= MF_OBJECT_WRITING;
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
        return 0;
 
 out_fail:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        __set_copyup_node(mio, req, NULL);
        while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
                struct peer_req * preq = (struct peer_req *) idx;
@@ -1267,11 +1297,12 @@ static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
        strncpy(mn->object, tmp.object, tmp.objectlen);
        mn->object[tmp.objectlen] = 0;
        mn->objectlen = tmp.objectlen;
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 
-       print_map(mn->map);
        XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
-       while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
+       uint64_t qsize = xq_count(&mn->pending);
+       while(qsize > 0 && (idx = __xq_pop_head(&mn->pending)) != Noneidx){
+               qsize--;
                struct peer_req * preq = (struct peer_req *) idx;
                my_dispatch(peer, preq, preq->req);
        }
@@ -1279,7 +1310,7 @@ static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
 
 out_fail:
        XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
                struct peer_req *preq = (struct peer_req *) idx;
                fail(peer, preq);
@@ -1288,7 +1319,7 @@ out_fail:
 
 out_err:
        XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        return -1;
 }
 
@@ -1416,19 +1447,15 @@ static int delete_object(struct peerd *peer, struct peer_req *pr,
        struct mapperd *mapper = __get_mapperd(peer);
        struct mapper_io *mio = __get_mapper_io(pr);
 
-       if (!(mn->flags && MF_OBJECT_EXIST)){
-               //cant delete not existing object
-
-       }
+       mio->delobj = mn->objectidx;
        if (xq_count(&mn->pending) != 0) {
-               mio->delobj = mn->objectidx;
                __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
                XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
                                mn->object);
                return MF_PENDING;
        }
 
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->bportno, X_ALLOC);
        if (!req)
                goto out_err;
@@ -1445,7 +1472,7 @@ static int delete_object(struct peerd *peer, struct peer_req *pr,
        if (r < 0)
                goto out_put;
        __set_copyup_node(mio, req, mn);
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort)
                goto out_unset;
        r = xseg_signal(peer->xseg, p);
@@ -1456,11 +1483,66 @@ static int delete_object(struct peerd *peer, struct peer_req *pr,
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
        return -1;
 }
+
+/*
+ * Find next object for deletion. Start searching on idx mio->delobj.
+ * Skip non existing map_nodes, free_resources and skip non-existing objects
+ * Wait for all pending operations on the object, before moving forward to the 
+ * next object.
+ *
+ * Return MF_PENDING if theres is a pending operation on the next object
+ * or zero if there is no next object
+ */
+static int delete_next_object(struct peerd *peer, struct peer_req *pr,
+                               struct map *map)
+{
+       struct mapperd *mapper = __get_mapperd(peer);
+       struct mapper_io *mio = __get_mapper_io(pr);
+       uint64_t idx = mio->delobj;
+       struct map_node *mn;
+       int r;
+retry:
+       while (idx < calc_map_obj(map)) {
+               mn = find_object(map, idx);
+               if (!mn) {
+                       idx++;
+                       goto retry;
+               }
+               mio->delobj = idx;
+               if (xq_count(&mn->pending) != 0) {
+                       __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
+                       XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
+                                       mn->object);
+                       return MF_PENDING;
+               }
+               if (mn->flags & MF_OBJECT_EXIST){
+                       r = delete_object(peer, pr, mn);
+                       if (r < 0) {
+                               /* on error, just log it, release resources and
+                                * proceed to the next object
+                                */
+                               XSEGLOG2(&lc, E, "Object %s delete object return error"
+                                               "\n\t Map: %s [%llu]", 
+                                               mn->object, mn->map->volume, 
+                                               (unsigned long long) mn->objectidx);
+                               xq_free(&mn->pending);
+                       }
+                       else if (r == MF_PENDING){
+                               return r;
+                       }
+               } else {
+                       xq_free(&mn->pending);
+               }
+               idx++;
+       }
+       return 0;
+}
+
 static int handle_object_delete(struct peerd *peer, struct peer_req *pr, 
                                 struct map_node *mn, int err)
 {
@@ -1476,36 +1558,23 @@ static int handle_object_delete(struct peerd *peer, struct peer_req *pr,
 
        //assert object flags OK
        //free map_node_resources
-       map->flags &= ~MF_OBJECT_DELETING;
+       mn->flags &= ~MF_OBJECT_DELETING;
        xq_free(&mn->pending);
-       //find next object
-       idx = mn->objectidx;
-       //remove_object(map, idx);
-       idx++;
-       mn = find_object(map, idx);
-       while (!mn && idx < calc_map_obj(map)) {
-               idx++;
-               mn = find_object(map, idx);
-       }
-       if (mn) {
-               //delete next object or complete;
-               r = delete_object(peer, pr, mn);
-               if (r < 0) {
-                       XSEGLOG2(&lc, E, "Object %s delete object return error"
-                                        "\n\t Map: %s [%llu]", 
-                                        mn->object, mn->map->volume, 
-                                        (unsigned long long) mn->objectidx);
-                       goto del_completed;
-               }
-               XSEGLOG2(&lc, I, "Handle object delete OK");
-       } else {
-del_completed:
+
+       mio->delobj++;
+       r = delete_next_object(peer, pr, map);
+       if (r != MF_PENDING){
+               /* if there is no next object to delete, remove the map block
+                * from memory
+                */
+
                //assert map flags OK
-               map->flags &= ~MF_MAP_DELETING;
                map->flags |= MF_MAP_DESTROYED;
                XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
                //make all pending requests on map to fail
-               while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
+               uint64_t qsize = xq_count(&map->pending);
+               while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+                       qsize--;
                        struct peer_req * preq = (struct peer_req *) idx;
                        my_dispatch(peer, preq, preq->req);
                }
@@ -1516,6 +1585,7 @@ del_completed:
                xq_free(&map->pending);
                free(map);
        }
+       XSEGLOG2(&lc, I, "Handle object delete OK");
        return 0;
 }
 
@@ -1525,7 +1595,7 @@ static int delete_map(struct peerd *peer, struct peer_req *pr,
        void *dummy;
        struct mapperd *mapper = __get_mapperd(peer);
        struct mapper_io *mio = __get_mapper_io(pr);
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->mbportno, X_ALLOC);
        if (!req)
                goto out_err;
@@ -1542,7 +1612,7 @@ static int delete_map(struct peerd *peer, struct peer_req *pr,
        if (r < 0)
                goto out_put;
        __set_copyup_node(mio, req, NULL);
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort)
                goto out_unset;
        r = xseg_signal(peer->xseg, p);
@@ -1553,7 +1623,7 @@ static int delete_map(struct peerd *peer, struct peer_req *pr,
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
        return -1;
@@ -1578,32 +1648,30 @@ static int handle_map_delete(struct peerd *peer, struct peer_req *pr,
        } else {
                map->flags |= MF_MAP_DESTROYED;
                //delete all objects
-               XSEGLOG2(&lc, E, "Map %s map block deleted. Deleting objects", map->volume);
-               struct map_node *mn = find_object(map, 0);
-               if (!mn) {
-                       XSEGLOG2(&lc, E, "Map %s has no object 0", map->volume);
-                       //this should never happen
+               XSEGLOG2(&lc, I, "Map %s map block deleted. Deleting objects", map->volume);
+               mio->delobj = 0;
+               r = delete_next_object(peer, pr, map);
+               if (r != MF_PENDING){
+                       /* if there is no next object to delete, remove the map block
+                        * from memory
+                        */
+                       //assert map flags OK
+                       map->flags |= MF_MAP_DESTROYED;
+                       XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
                        //make all pending requests on map to fail
-                       while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
+                       uint64_t qsize = xq_count(&map->pending);
+                       while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+                               qsize--;
                                struct peer_req * preq = (struct peer_req *) idx;
                                my_dispatch(peer, preq, preq->req);
                        }
                        //free map resources;
                        remove_map(mapper, map);
+                       struct map_node *mn = find_object(map, 0);
+                       if (mn)
+                               free(mn);
                        xq_free(&map->pending);
                        free(map);
-                       return 0;
-               }
-               r = delete_object(peer, pr, mn);
-               if (r < 0) {
-                       XSEGLOG2(&lc, E, "Deleting first object of map %s returned error"
-                                       "\n\t Dispatching pending requests",
-                                       map->volume);
-                       //dispatch all pending
-                       while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
-                               struct peer_req * preq = (struct peer_req *) idx;
-                               my_dispatch(peer, preq, preq->req);
-                       }
                }
        }
        return 0;
@@ -1627,7 +1695,7 @@ static int handle_delete(struct peerd *peer, struct peer_req *pr,
                //map block delete
                map = find_map(mapper, target, req->targetlen);
                if (!map) {
-                       xseg_put_request(peer->xseg, req, peer->portno);
+                       xseg_put_request(peer->xseg, req, pr->portno);
                        return -1;
                }
                handle_map_delete(peer, pr, map, err);
@@ -1635,12 +1703,12 @@ static int handle_delete(struct peerd *peer, struct peer_req *pr,
                //object delete
                map = mn->map;
                if (!map) {
-                       xseg_put_request(peer->xseg, req, peer->portno);
+                       xseg_put_request(peer->xseg, req, pr->portno);
                        return -1;
                }
                handle_object_delete(peer, pr, mn, err);
        }
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        return 0;
 }
 
@@ -1651,7 +1719,16 @@ static int handle_destroy(struct peerd *peer, struct peer_req *pr,
        struct mapper_io *mio = __get_mapper_io(pr);
        (void) mapper;
        int r;
+       char buf[XSEG_MAX_TARGETLEN+1];
+       char *target = xseg_get_target(peer->xseg, pr->req);
 
+       strncpy(buf, target, pr->req->targetlen);
+       buf[req->targetlen] = 0;
+
+       XSEGLOG2(&lc, D, "Handle destroy pr: %lx, pr->req: %lx, req: %lx",
+                       (unsigned long) pr, (unsigned long) pr->req,
+                       (unsigned long) req);
+       XSEGLOG2(&lc, D, "target: %s (%u)", buf, strlen(buf));
        if (pr->req != req && req->op == X_DELETE) {
                //assert mio->state == DELETING
                r = handle_delete(peer, pr, req);
@@ -1665,7 +1742,6 @@ static int handle_destroy(struct peerd *peer, struct peer_req *pr,
        }
 
        struct map *map;
-       char *target = xseg_get_target(peer->xseg, pr->req);
        r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
        if (r < 0) {
                fail(peer, pr);
@@ -1680,20 +1756,14 @@ static int handle_destroy(struct peerd *peer, struct peer_req *pr,
                }
                else{
                        XSEGLOG2(&lc, I, "Map %s already destroyed", map->volume);
-                       fprintf(stderr, "map destroyed\n");
                        fail(peer, pr);
                }
                return 0;
        }
        if (mio->state == DELETING) {
                //continue deleting map objects;
-               struct map_node *mn = find_object(map, mio->delobj);
-               if (!mn) {
-                       complete(peer, pr);
-                       return 0;
-               }
-               r = delete_object(peer, pr, mn);
-               if (r < 0) {
+               r = delete_next_object(peer ,pr, map);
+               if (r != MF_PENDING){
                        complete(peer, pr);
                }
                return 0;
@@ -1716,6 +1786,80 @@ static int handle_destroy(struct peerd *peer, struct peer_req *pr,
        return 0;
 }
 
+static int handle_dropcache(struct peerd *peer, struct peer_req *pr, 
+                               struct xseg_request *req)
+{
+       struct mapperd *mapper = __get_mapperd(peer);
+       struct mapper_io *mio = __get_mapper_io(pr);
+       (void) mapper;
+       (void) mio;
+       char *target = xseg_get_target(peer->xseg, pr->req);
+       if (!target) {
+               fail(peer, pr);
+               return 0;
+       }
+
+       struct map *map = find_map(mapper, target, pr->req->targetlen);
+       if (!map){
+               complete(peer, pr);
+               return 0;
+       } else if (map->flags & MF_MAP_DESTROYED) {
+               complete(peer, pr);
+               return 0;
+       } else if (map->flags & MF_MAP_NOT_READY && mio->state != DROPPING_CACHE) {
+               __xq_append_tail(&map->pending, (xqindex) pr);
+               return 0;
+       }
+
+       if (mio->state != DROPPING_CACHE) {
+               /* block all future operations on the map */
+               map->flags |= MF_MAP_DROPPING_CACHE;
+               mio->dcobj = 0;
+               mio->state = DROPPING_CACHE;
+               XSEGLOG2(&lc, I, "Map %s start dropping cache", map->volume);
+       } else {
+               XSEGLOG2(&lc, I, "Map %s continue dropping cache", map->volume);
+       }
+
+       struct map_node *mn; 
+       uint64_t i;
+       for (i = mio->dcobj; i < calc_map_obj(map); i++) {
+               mn = find_object(map, i);
+               if (!mn)
+                       continue;
+               mio->dcobj = i;
+               if (xq_count(&mn->pending) != 0){
+                       XSEGLOG2(&lc, D, "Map %s pending dropping cache for obj idx: %llu", 
+                               map->volume, (unsigned long long) mn->objectidx);
+                       __xq_append_tail(&mn->pending, (xqindex) pr);
+                       return 0;
+               }
+               xq_free(&mn->pending);
+               XSEGLOG2(&lc, D, "Map %s dropped cache for obj idx: %llu", 
+                               map->volume, (unsigned long long) mn->objectidx);
+       }
+       remove_map(mapper, map);
+       //dispatch pending
+       uint64_t qsize = xq_count(&map->pending);
+       while(qsize > 0 && (i = __xq_pop_head(&map->pending)) != Noneidx){
+               qsize--;
+               struct peer_req * preq = (struct peer_req *) i;
+               my_dispatch(peer, preq, preq->req);
+       }
+       XSEGLOG2(&lc, I, "Map %s droped cache", map->volume);
+       
+       //free map resources;
+       mn = find_object(map, 0);
+       if (mn)
+               free(mn);
+       xq_free(&map->pending);
+       free(map);
+
+       complete(peer, pr);
+
+       return 0;
+}
+
 static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
 {
        struct mapperd *mapper = __get_mapperd(peer);
@@ -1737,6 +1881,7 @@ static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_requ
 //             case X_SNAPSHOT: handle_snap(peer, pr, req); break;
                case X_INFO: handle_info(peer, pr, req); break;
                case X_DELETE: handle_destroy(peer, pr, req); break;
+               case X_CLOSE: handle_dropcache(peer, pr, req); break;
                default: fprintf(stderr, "mydispatch: unknown up\n"); break;
        }
        return 0;
@@ -1822,6 +1967,10 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                }
        }
 
+       const struct sched_param param = { .sched_priority = 99 };
+       sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
+
+
 //     test_map(peer);
 
        return 0;