added speer skeletor
[archipelago] / xseg / peers / user / mt-mapperd.c
index 3f2d91f..289f8cd 100644 (file)
@@ -12,6 +12,8 @@
 #include <fcntl.h>
 #include <gcrypt.h>
 #include <errno.h>
+#include <sched.h>
+#include <sys/syscall.h>
 
 GCRY_THREAD_OPTION_PTHREAD_IMPL;
 
@@ -142,7 +144,7 @@ static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targ
        //assert targetlen <= XSEG_MAX_TARGETLEN
        strncpy(buf, target, targetlen);
        buf[targetlen] = 0;
-       XSEGLOG2(&lc, E, "looking up map %s, len %u", buf, targetlen);
+       XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
        r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
        if (r < 0)
                return NULL;
@@ -159,10 +161,11 @@ static int insert_map(struct mapperd *mapper, struct map *map)
                goto out;
        }
 
-       XSEGLOG2(&lc, E, "Inserting map %s, len: %d", map->volume, strlen(map->volume));
+       XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
+                       map->volume, strlen(map->volume), (unsigned long) map);
        r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
-       if (r == -XHASH_ERESIZE) {
-               xhashidx shift = xhash_grow_size_shift(map->objects);
+       while (r == -XHASH_ERESIZE) {
+               xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
                if (!new_hashmap){
                        XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
@@ -183,8 +186,8 @@ static int remove_map(struct mapperd *mapper, struct map *map)
        //assert no pending pr on map
        
        r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
-       if (r == -XHASH_ERESIZE) {
-               xhashidx shift = xhash_shrink_size_shift(map->objects);
+       while (r == -XHASH_ERESIZE) {
+               xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
                if (!new_hashmap){
                        XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
@@ -242,7 +245,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target,
                goto out_hash;
        
 
-       req = xseg_get_request(peer->xseg, peer->portno, mapper->mbportno, X_ALLOC);
+       req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
        if (!req){
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
                                m->volume);
@@ -269,7 +272,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target,
                                m->volume);
                goto out_put;
        }
-       p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){ 
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
                                m->volume);
@@ -283,7 +286,7 @@ static int load_map(struct peerd *peer, struct peer_req *pr, char *target,
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 
 out_fail:
        remove_map(mapper, m);
@@ -324,6 +327,7 @@ static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
        int r;
        *m = find_map(mapper, target, targetlen);
        if (*m) {
+               XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
                if ((*m)->flags & MF_MAP_NOT_READY) {
                        __xq_append_tail(&(*m)->pending, (xqindex) pr);
                        XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
@@ -419,7 +423,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
 {
        void *dummy;
        struct mapperd *mapper = __get_mapperd(peer);
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
                                                        mapper->mbportno, X_ALLOC);
        if (!req){
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
@@ -449,7 +453,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
                goto out_put;
        }
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){
                XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
                                "(Map: %s [%llu]",
@@ -469,7 +473,7 @@ static int object_write(struct peerd *peer, struct peer_req *pr,
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
                        "(Map: %s [%llu]",
@@ -483,7 +487,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
        struct mapperd *mapper = __get_mapperd(peer);
        struct map_node *mn;
        uint64_t i, pos, max_objidx = calc_map_obj(map);
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->mbportno, X_ALLOC);
        if (!req){
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
@@ -522,7 +526,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
                                map->volume);
                goto out_put;
        }
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort){
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
                                map->volume);
@@ -539,7 +543,7 @@ static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
        return -1;
@@ -680,7 +684,7 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re
        if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE)) 
                goto copyup_zeroblock;
 
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->bportno, X_ALLOC);
        if (!req)
                goto out_err;
@@ -703,7 +707,7 @@ static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_re
        if (r<0)
                goto out_put;
        r = __set_copyup_node(mio, req, mn);
-       p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort) {
                goto out_unset;
        }
@@ -718,7 +722,7 @@ out_unset:
        r = __set_copyup_node(mio, req, NULL);
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
        return -1;
@@ -772,11 +776,12 @@ static int handle_mapread(struct peerd *peer, struct peer_req *pr,
        if (r < 0)
                goto out_fail;
        
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        map->flags &= ~MF_MAP_LOADING;
-       print_map(map);
        XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
-       while((idx = __xq_pop_head(&map->pending)) != Noneidx){
+       uint64_t qsize = xq_count(&map->pending);
+       while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+               qsize--;
                struct peer_req *preq = (struct peer_req *) idx;
                my_dispatch(peer, preq, preq->req);
        }
@@ -784,7 +789,7 @@ static int handle_mapread(struct peerd *peer, struct peer_req *pr,
 
 out_fail:
        XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        map->flags &= ~MF_MAP_LOADING;
        while((idx = __xq_pop_head(&map->pending)) != Noneidx){
                struct peer_req *preq = (struct peer_req *) idx;
@@ -799,7 +804,7 @@ out_err:
        strncpy(buf, target, req->targetlen);
        buf[req->targetlen] = 0;
        XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        return -1;
 }
 
@@ -823,10 +828,12 @@ static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
                goto out_fail;
        }
        
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        map->flags &= ~MF_MAP_WRITING;
        XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
-       while((idx = __xq_pop_head(&map->pending)) != Noneidx){
+       uint64_t qsize = xq_count(&map->pending);
+       while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+               qsize--;
                struct peer_req *preq = (struct peer_req *) idx;
                my_dispatch(peer, preq, preq->req);
        }
@@ -835,7 +842,7 @@ static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
 
 out_fail:
        XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        map->flags &= ~MF_MAP_WRITING;
        while((idx = __xq_pop_head(&map->pending)) != Noneidx){
                struct peer_req *preq = (struct peer_req *) idx;
@@ -850,7 +857,7 @@ out_err:
        strncpy(buf, target, req->targetlen);
        buf[req->targetlen] = 0;
        XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        return -1;
 }
 
@@ -986,7 +993,6 @@ static int handle_clone(struct peerd *peer, struct peer_req *pr,
                        goto out_free_all;
                }
        }
-       print_map(clonemap);
        //insert map
        r = insert_map(mapper, clonemap);
        if ( r < 0) {
@@ -1084,7 +1090,6 @@ static int req2objs(struct peerd *peer, struct peer_req *pr,
 //                             block_size);
        strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
        reply->segs[idx].targetlen = mn->objectlen;
-       reply->segs[idx].target[mn->objectlen] = 0;
        reply->segs[idx].offset = obj_offset;
        reply->segs[idx].size = obj_size;
 //     XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
@@ -1115,7 +1120,6 @@ static int req2objs(struct peerd *peer, struct peer_req *pr,
                }
                strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
                reply->segs[idx].targetlen = mn->objectlen;
-               reply->segs[idx].target[mn->objectlen] = 0;
                reply->segs[idx].offset = obj_offset;
                reply->segs[idx].size = obj_size;
 //             XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
@@ -1241,12 +1245,12 @@ static int handle_copyup(struct peerd *peer, struct peer_req *pr,
                goto out_fail;
        }
        mn->flags |= MF_OBJECT_WRITING;
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
        return 0;
 
 out_fail:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        __set_copyup_node(mio, req, NULL);
        while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
                struct peer_req * preq = (struct peer_req *) idx;
@@ -1293,11 +1297,12 @@ static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
        strncpy(mn->object, tmp.object, tmp.objectlen);
        mn->object[tmp.objectlen] = 0;
        mn->objectlen = tmp.objectlen;
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 
-       print_map(mn->map);
        XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
-       while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
+       uint64_t qsize = xq_count(&mn->pending);
+       while(qsize > 0 && (idx = __xq_pop_head(&mn->pending)) != Noneidx){
+               qsize--;
                struct peer_req * preq = (struct peer_req *) idx;
                my_dispatch(peer, preq, preq->req);
        }
@@ -1305,7 +1310,7 @@ static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
 
 out_fail:
        XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
                struct peer_req *preq = (struct peer_req *) idx;
                fail(peer, preq);
@@ -1314,7 +1319,7 @@ out_fail:
 
 out_err:
        XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        return -1;
 }
 
@@ -1450,7 +1455,7 @@ static int delete_object(struct peerd *peer, struct peer_req *pr,
                return MF_PENDING;
        }
 
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->bportno, X_ALLOC);
        if (!req)
                goto out_err;
@@ -1467,7 +1472,7 @@ static int delete_object(struct peerd *peer, struct peer_req *pr,
        if (r < 0)
                goto out_put;
        __set_copyup_node(mio, req, mn);
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort)
                goto out_unset;
        r = xseg_signal(peer->xseg, p);
@@ -1478,7 +1483,7 @@ static int delete_object(struct peerd *peer, struct peer_req *pr,
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
        return -1;
@@ -1555,7 +1560,8 @@ static int handle_object_delete(struct peerd *peer, struct peer_req *pr,
        //free map_node_resources
        mn->flags &= ~MF_OBJECT_DELETING;
        xq_free(&mn->pending);
-       
+
+       mio->delobj++;
        r = delete_next_object(peer, pr, map);
        if (r != MF_PENDING){
                /* if there is no next object to delete, remove the map block
@@ -1566,7 +1572,9 @@ static int handle_object_delete(struct peerd *peer, struct peer_req *pr,
                map->flags |= MF_MAP_DESTROYED;
                XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
                //make all pending requests on map to fail
-               while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
+               uint64_t qsize = xq_count(&map->pending);
+               while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+                       qsize--;
                        struct peer_req * preq = (struct peer_req *) idx;
                        my_dispatch(peer, preq, preq->req);
                }
@@ -1587,7 +1595,7 @@ static int delete_map(struct peerd *peer, struct peer_req *pr,
        void *dummy;
        struct mapperd *mapper = __get_mapperd(peer);
        struct mapper_io *mio = __get_mapper_io(pr);
-       struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
+       struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
                                                        mapper->mbportno, X_ALLOC);
        if (!req)
                goto out_err;
@@ -1604,7 +1612,7 @@ static int delete_map(struct peerd *peer, struct peer_req *pr,
        if (r < 0)
                goto out_put;
        __set_copyup_node(mio, req, NULL);
-       xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+       xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
        if (p == NoPort)
                goto out_unset;
        r = xseg_signal(peer->xseg, p);
@@ -1615,7 +1623,7 @@ static int delete_map(struct peerd *peer, struct peer_req *pr,
 out_unset:
        xseg_get_req_data(peer->xseg, req, &dummy);
 out_put:
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
 out_err:
        XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
        return -1;
@@ -1651,7 +1659,9 @@ static int handle_map_delete(struct peerd *peer, struct peer_req *pr,
                        map->flags |= MF_MAP_DESTROYED;
                        XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
                        //make all pending requests on map to fail
-                       while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
+                       uint64_t qsize = xq_count(&map->pending);
+                       while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+                               qsize--;
                                struct peer_req * preq = (struct peer_req *) idx;
                                my_dispatch(peer, preq, preq->req);
                        }
@@ -1685,7 +1695,7 @@ static int handle_delete(struct peerd *peer, struct peer_req *pr,
                //map block delete
                map = find_map(mapper, target, req->targetlen);
                if (!map) {
-                       xseg_put_request(peer->xseg, req, peer->portno);
+                       xseg_put_request(peer->xseg, req, pr->portno);
                        return -1;
                }
                handle_map_delete(peer, pr, map, err);
@@ -1693,12 +1703,12 @@ static int handle_delete(struct peerd *peer, struct peer_req *pr,
                //object delete
                map = mn->map;
                if (!map) {
-                       xseg_put_request(peer->xseg, req, peer->portno);
+                       xseg_put_request(peer->xseg, req, pr->portno);
                        return -1;
                }
                handle_object_delete(peer, pr, mn, err);
        }
-       xseg_put_request(peer->xseg, req, peer->portno);
+       xseg_put_request(peer->xseg, req, pr->portno);
        return 0;
 }
 
@@ -1709,7 +1719,16 @@ static int handle_destroy(struct peerd *peer, struct peer_req *pr,
        struct mapper_io *mio = __get_mapper_io(pr);
        (void) mapper;
        int r;
+       char buf[XSEG_MAX_TARGETLEN+1];
+       char *target = xseg_get_target(peer->xseg, pr->req);
+
+       strncpy(buf, target, pr->req->targetlen);
+       buf[req->targetlen] = 0;
 
+       XSEGLOG2(&lc, D, "Handle destroy pr: %lx, pr->req: %lx, req: %lx",
+                       (unsigned long) pr, (unsigned long) pr->req,
+                       (unsigned long) req);
+       XSEGLOG2(&lc, D, "target: %s (%u)", buf, strlen(buf));
        if (pr->req != req && req->op == X_DELETE) {
                //assert mio->state == DELETING
                r = handle_delete(peer, pr, req);
@@ -1723,7 +1742,6 @@ static int handle_destroy(struct peerd *peer, struct peer_req *pr,
        }
 
        struct map *map;
-       char *target = xseg_get_target(peer->xseg, pr->req);
        r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
        if (r < 0) {
                fail(peer, pr);
@@ -1798,6 +1816,9 @@ static int handle_dropcache(struct peerd *peer, struct peer_req *pr,
                map->flags |= MF_MAP_DROPPING_CACHE;
                mio->dcobj = 0;
                mio->state = DROPPING_CACHE;
+               XSEGLOG2(&lc, I, "Map %s start dropping cache", map->volume);
+       } else {
+               XSEGLOG2(&lc, I, "Map %s continue dropping cache", map->volume);
        }
 
        struct map_node *mn; 
@@ -1808,17 +1829,24 @@ static int handle_dropcache(struct peerd *peer, struct peer_req *pr,
                        continue;
                mio->dcobj = i;
                if (xq_count(&mn->pending) != 0){
+                       XSEGLOG2(&lc, D, "Map %s pending dropping cache for obj idx: %llu", 
+                               map->volume, (unsigned long long) mn->objectidx);
                        __xq_append_tail(&mn->pending, (xqindex) pr);
                        return 0;
                }
                xq_free(&mn->pending);
+               XSEGLOG2(&lc, D, "Map %s dropped cache for obj idx: %llu", 
+                               map->volume, (unsigned long long) mn->objectidx);
        }
        remove_map(mapper, map);
        //dispatch pending
-       while ((i = __xq_pop_head(&map->pending)) != Noneidx){
+       uint64_t qsize = xq_count(&map->pending);
+       while(qsize > 0 && (i = __xq_pop_head(&map->pending)) != Noneidx){
+               qsize--;
                struct peer_req * preq = (struct peer_req *) i;
                my_dispatch(peer, preq, preq->req);
        }
+       XSEGLOG2(&lc, I, "Map %s droped cache", map->volume);
        
        //free map resources;
        mn = find_object(map, 0);
@@ -1939,6 +1967,10 @@ int custom_peer_init(struct peerd *peer, int argc, char *argv[])
                }
        }
 
+       const struct sched_param param = { .sched_priority = 99 };
+       sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
+
+
 //     test_map(peer);
 
        return 0;