#include <fcntl.h>
#include <gcrypt.h>
#include <errno.h>
+#include <sched.h>
+#include <sys/syscall.h>
GCRY_THREAD_OPTION_PTHREAD_IMPL;
ACCEPTED = 0,
WRITING = 1,
COPYING = 2,
- DELETING = 3
+ DELETING = 3,
+ DROPPING_CACHE = 4
};
struct map_node {
#define MF_MAP_DESTROYED (1 << 1)
#define MF_MAP_WRITING (1 << 2)
#define MF_MAP_DELETING (1 << 3)
+#define MF_MAP_DROPPING_CACHE (1 << 4)
-#define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING)
+#define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
+ MF_MAP_DROPPING_CACHE)
struct map {
uint32_t flags;
struct map_node *copyup_node;
int err; /* error flag */
uint64_t delobj;
+ uint64_t dcobj;
enum mapper_state state;
};
//assert targetlen <= XSEG_MAX_TARGETLEN
strncpy(buf, target, targetlen);
buf[targetlen] = 0;
- XSEGLOG2(&lc, E, "looking up map %s, len %u", buf, targetlen);
+ XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
if (r < 0)
return NULL;
goto out;
}
- XSEGLOG2(&lc, E, "Inserting map %s, len: %d", map->volume, strlen(map->volume));
+ XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)",
+ map->volume, strlen(map->volume), (unsigned long) map);
r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
- if (r == -XHASH_ERESIZE) {
- xhashidx shift = xhash_grow_size_shift(map->objects);
+ while (r == -XHASH_ERESIZE) {
+ xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
if (!new_hashmap){
XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
//assert no pending pr on map
r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
- if (r == -XHASH_ERESIZE) {
- xhashidx shift = xhash_shrink_size_shift(map->objects);
+ while (r == -XHASH_ERESIZE) {
+ xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
if (!new_hashmap){
XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
}
/* async map load */
-static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint32_t targetlen)
+static int load_map(struct peerd *peer, struct peer_req *pr, char *target,
+ uint32_t targetlen)
{
int r;
xport p;
struct xseg_request *req;
struct mapperd *mapper = __get_mapperd(peer);
void *dummy;
- //printf("Loading map\n");
struct map *m = find_map(mapper, target, targetlen);
if (!m) {
if (r < 0)
goto out_hash;
- //printf("Loading map: preparing req\n");
- req = xseg_get_request(peer->xseg, peer->portno, mapper->mbportno, X_ALLOC);
+ req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
if (!req){
XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
m->volume);
char *reqtarget = xseg_get_target(peer->xseg, req);
if (!reqtarget)
goto out_put;
- strncpy(reqtarget, target, targetlen);
+ strncpy(reqtarget, target, req->targetlen);
req->op = X_READ;
req->size = block_size;
req->offset = 0;
m->volume);
goto out_put;
}
- p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort){
XSEGLOG2(&lc, E, "Cannot submit request for map %s",
m->volume);
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_fail:
remove_map(mapper, m);
int r;
*m = find_map(mapper, target, targetlen);
if (*m) {
+ XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
if ((*m)->flags & MF_MAP_NOT_READY) {
__xq_append_tail(&(*m)->pending, (xqindex) pr);
XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
{
void *dummy;
struct mapperd *mapper = __get_mapperd(peer);
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->mbportno, X_ALLOC);
if (!req){
XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
goto out_put;
}
char *target = xseg_get_target(peer->xseg, req);
- strncpy(target, map->volume, map->volumelen);
+ strncpy(target, map->volume, req->targetlen);
req->size = objectsize_in_map;
req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
req->op = X_WRITE;
mn->object, map->volume, (unsigned long long) mn->objectidx);
goto out_put;
}
- xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort){
XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
"(Map: %s [%llu]",
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
"(Map: %s [%llu]",
struct mapperd *mapper = __get_mapperd(peer);
struct map_node *mn;
uint64_t i, pos, max_objidx = calc_map_obj(map);
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->mbportno, X_ALLOC);
if (!req){
XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
map->volume);
goto out_put;
}
- xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort){
XSEGLOG2(&lc, E, "Cannot submit request for map %s",
map->volume);
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
return -1;
sprintf (new_target + 2*i, "%02x", buf[i]);
newtargetlen = SHA256_DIGEST_SIZE * 2;
+ if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE))
+ goto copyup_zeroblock;
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->bportno, X_ALLOC);
if (!req)
goto out_err;
goto out_put;
char *target = xseg_get_target(peer->xseg, req);
- strncpy(target, new_target, newtargetlen);
+ strncpy(target, new_target, req->targetlen);
struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
strncpy(xcopy->target, mn->object, mn->objectlen);
if (r<0)
goto out_put;
r = __set_copyup_node(mio, req, mn);
- p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort) {
goto out_unset;
}
xseg_signal(peer->xseg, p);
mio->copyups++;
+ mn->flags |= MF_OBJECT_COPYING;
XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
return 0;
out_unset:
+ r = __set_copyup_node(mio, req, NULL);
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
return -1;
+copyup_zeroblock:
+ XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
+ "Proceeding in writing the new object in map");
+ /* construct a tmp map_node for writing purposes */
+ struct map_node newmn = *mn;
+ newmn.flags = MF_OBJECT_EXIST;
+ strncpy(newmn.object, new_target, newtargetlen);
+ newmn.object[newtargetlen] = 0;
+ newmn.objectlen = newtargetlen;
+ newmn.objectidx = mn->objectidx;
+ r = __set_copyup_node(mio, req, mn);
+ r = object_write(peer, pr, map, &newmn);
+ if (r != MF_PENDING){
+ XSEGLOG2(&lc, E, "Object write returned error for object %s"
+ "\n\t of map %s [%llu]",
+ mn->object, map->volume, (unsigned long long) mn->objectidx);
+ return -1;
+ }
+ mn->flags |= MF_OBJECT_WRITING;
+ XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
+ return 0;
}
/*
if (r < 0)
goto out_fail;
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
map->flags &= ~MF_MAP_LOADING;
- print_map(map);
XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
- while((idx = __xq_pop_head(&map->pending)) != Noneidx){
+ uint64_t qsize = xq_count(&map->pending);
+ while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+ qsize--;
struct peer_req *preq = (struct peer_req *) idx;
my_dispatch(peer, preq, preq->req);
}
out_fail:
XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
map->flags &= ~MF_MAP_LOADING;
while((idx = __xq_pop_head(&map->pending)) != Noneidx){
struct peer_req *preq = (struct peer_req *) idx;
strncpy(buf, target, req->targetlen);
buf[req->targetlen] = 0;
XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
goto out_fail;
}
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
map->flags &= ~MF_MAP_WRITING;
XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
- while((idx = __xq_pop_head(&map->pending)) != Noneidx){
+ uint64_t qsize = xq_count(&map->pending);
+ while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+ qsize--;
struct peer_req *preq = (struct peer_req *) idx;
my_dispatch(peer, preq, preq->req);
}
out_fail:
XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
map->flags &= ~MF_MAP_WRITING;
while((idx = __xq_pop_head(&map->pending)) != Noneidx){
struct peer_req *preq = (struct peer_req *) idx;
strncpy(buf, target, req->targetlen);
buf[req->targetlen] = 0;
XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
goto out_free_all;
}
}
- print_map(clonemap);
//insert map
r = insert_map(mapper, clonemap);
if ( r < 0) {
XSEGLOG2(&lc, E, "Error in copy up object");
goto out_err_copy;
}
- mn->flags |= MF_OBJECT_COPYING;
goto out_object_copying;
}
// block_size);
strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
reply->segs[idx].targetlen = mn->objectlen;
- reply->segs[idx].target[mn->objectlen] = 0;
reply->segs[idx].offset = obj_offset;
reply->segs[idx].size = obj_size;
// XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
XSEGLOG2(&lc, E, "Error in copy up object");
goto out_err_copy;
}
- mn->flags |= MF_OBJECT_COPYING;
goto out_object_copying;
}
strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
reply->segs[idx].targetlen = mn->objectlen;
- reply->segs[idx].target[mn->objectlen] = 0;
reply->segs[idx].offset = obj_offset;
reply->segs[idx].size = obj_size;
// XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
goto out_fail;
}
mn->flags |= MF_OBJECT_WRITING;
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
return 0;
out_fail:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
__set_copyup_node(mio, req, NULL);
while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
struct peer_req * preq = (struct peer_req *) idx;
strncpy(mn->object, tmp.object, tmp.objectlen);
mn->object[tmp.objectlen] = 0;
mn->objectlen = tmp.objectlen;
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
- print_map(mn->map);
XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
- while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
+ uint64_t qsize = xq_count(&mn->pending);
+ while(qsize > 0 && (idx = __xq_pop_head(&mn->pending)) != Noneidx){
+ qsize--;
struct peer_req * preq = (struct peer_req *) idx;
my_dispatch(peer, preq, preq->req);
}
out_fail:
XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
struct peer_req *preq = (struct peer_req *) idx;
fail(peer, preq);
out_err:
XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
struct mapperd *mapper = __get_mapperd(peer);
struct mapper_io *mio = __get_mapper_io(pr);
- if (!(mn->flags && MF_OBJECT_EXIST)){
- //cant delete not existing object
-
- }
+ mio->delobj = mn->objectidx;
if (xq_count(&mn->pending) != 0) {
- mio->delobj = mn->objectidx;
__xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
mn->object);
return MF_PENDING;
}
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->bportno, X_ALLOC);
if (!req)
goto out_err;
if (r < 0)
goto out_put;
__set_copyup_node(mio, req, mn);
- xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort)
goto out_unset;
r = xseg_signal(peer->xseg, p);
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
return -1;
}
+
+/*
+ * Find next object for deletion. Start searching on idx mio->delobj.
+ * Skip non existing map_nodes, free_resources and skip non-existing objects
+ * Wait for all pending operations on the object, before moving forward to the
+ * next object.
+ *
+ * Return MF_PENDING if theres is a pending operation on the next object
+ * or zero if there is no next object
+ */
+static int delete_next_object(struct peerd *peer, struct peer_req *pr,
+ struct map *map)
+{
+ struct mapperd *mapper = __get_mapperd(peer);
+ struct mapper_io *mio = __get_mapper_io(pr);
+ uint64_t idx = mio->delobj;
+ struct map_node *mn;
+ int r;
+retry:
+ while (idx < calc_map_obj(map)) {
+ mn = find_object(map, idx);
+ if (!mn) {
+ idx++;
+ goto retry;
+ }
+ mio->delobj = idx;
+ if (xq_count(&mn->pending) != 0) {
+ __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
+ XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
+ mn->object);
+ return MF_PENDING;
+ }
+ if (mn->flags & MF_OBJECT_EXIST){
+ r = delete_object(peer, pr, mn);
+ if (r < 0) {
+ /* on error, just log it, release resources and
+ * proceed to the next object
+ */
+ XSEGLOG2(&lc, E, "Object %s delete object return error"
+ "\n\t Map: %s [%llu]",
+ mn->object, mn->map->volume,
+ (unsigned long long) mn->objectidx);
+ xq_free(&mn->pending);
+ }
+ else if (r == MF_PENDING){
+ return r;
+ }
+ } else {
+ xq_free(&mn->pending);
+ }
+ idx++;
+ }
+ return 0;
+}
+
static int handle_object_delete(struct peerd *peer, struct peer_req *pr,
struct map_node *mn, int err)
{
//assert object flags OK
//free map_node_resources
- map->flags &= ~MF_OBJECT_DELETING;
+ mn->flags &= ~MF_OBJECT_DELETING;
xq_free(&mn->pending);
- //find next object
- idx = mn->objectidx;
- //remove_object(map, idx);
- idx++;
- mn = find_object(map, idx);
- while (!mn && idx < calc_map_obj(map)) {
- idx++;
- mn = find_object(map, idx);
- }
- if (mn) {
- //delete next object or complete;
- r = delete_object(peer, pr, mn);
- if (r < 0) {
- XSEGLOG2(&lc, E, "Object %s delete object return error"
- "\n\t Map: %s [%llu]",
- mn->object, mn->map->volume,
- (unsigned long long) mn->objectidx);
- goto del_completed;
- }
- XSEGLOG2(&lc, I, "Handle object delete OK");
- } else {
-del_completed:
+
+ mio->delobj++;
+ r = delete_next_object(peer, pr, map);
+ if (r != MF_PENDING){
+ /* if there is no next object to delete, remove the map block
+ * from memory
+ */
+
//assert map flags OK
- map->flags &= ~MF_MAP_DELETING;
map->flags |= MF_MAP_DESTROYED;
XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
//make all pending requests on map to fail
- while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
+ uint64_t qsize = xq_count(&map->pending);
+ while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+ qsize--;
struct peer_req * preq = (struct peer_req *) idx;
my_dispatch(peer, preq, preq->req);
}
xq_free(&map->pending);
free(map);
}
+ XSEGLOG2(&lc, I, "Handle object delete OK");
return 0;
}
void *dummy;
struct mapperd *mapper = __get_mapperd(peer);
struct mapper_io *mio = __get_mapper_io(pr);
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->mbportno, X_ALLOC);
if (!req)
goto out_err;
if (r < 0)
goto out_put;
__set_copyup_node(mio, req, NULL);
- xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort)
goto out_unset;
r = xseg_signal(peer->xseg, p);
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
return -1;
} else {
map->flags |= MF_MAP_DESTROYED;
//delete all objects
- XSEGLOG2(&lc, E, "Map %s map block deleted. Deleting objects", map->volume);
- struct map_node *mn = find_object(map, 0);
- if (!mn) {
- XSEGLOG2(&lc, E, "Map %s has no object 0", map->volume);
- //this should never happen
+ XSEGLOG2(&lc, I, "Map %s map block deleted. Deleting objects", map->volume);
+ mio->delobj = 0;
+ r = delete_next_object(peer, pr, map);
+ if (r != MF_PENDING){
+ /* if there is no next object to delete, remove the map block
+ * from memory
+ */
+ //assert map flags OK
+ map->flags |= MF_MAP_DESTROYED;
+ XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
//make all pending requests on map to fail
- while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
+ uint64_t qsize = xq_count(&map->pending);
+ while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+ qsize--;
struct peer_req * preq = (struct peer_req *) idx;
my_dispatch(peer, preq, preq->req);
}
//free map resources;
remove_map(mapper, map);
+ struct map_node *mn = find_object(map, 0);
+ if (mn)
+ free(mn);
xq_free(&map->pending);
free(map);
- return 0;
- }
- r = delete_object(peer, pr, mn);
- if (r < 0) {
- XSEGLOG2(&lc, E, "Deleting first object of map %s returned error"
- "\n\t Dispatching pending requests",
- map->volume);
- //dispatch all pending
- while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
- struct peer_req * preq = (struct peer_req *) idx;
- my_dispatch(peer, preq, preq->req);
- }
}
}
return 0;
//map block delete
map = find_map(mapper, target, req->targetlen);
if (!map) {
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
handle_map_delete(peer, pr, map, err);
//object delete
map = mn->map;
if (!map) {
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
handle_object_delete(peer, pr, mn, err);
}
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return 0;
}
struct mapper_io *mio = __get_mapper_io(pr);
(void) mapper;
int r;
+ char buf[XSEG_MAX_TARGETLEN+1];
+ char *target = xseg_get_target(peer->xseg, pr->req);
+ strncpy(buf, target, pr->req->targetlen);
+ buf[req->targetlen] = 0;
+
+ XSEGLOG2(&lc, D, "Handle destroy pr: %lx, pr->req: %lx, req: %lx",
+ (unsigned long) pr, (unsigned long) pr->req,
+ (unsigned long) req);
+ XSEGLOG2(&lc, D, "target: %s (%u)", buf, strlen(buf));
if (pr->req != req && req->op == X_DELETE) {
//assert mio->state == DELETING
r = handle_delete(peer, pr, req);
}
struct map *map;
- char *target = xseg_get_target(peer->xseg, pr->req);
r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
if (r < 0) {
fail(peer, pr);
}
else{
XSEGLOG2(&lc, I, "Map %s already destroyed", map->volume);
- fprintf(stderr, "map destroyed\n");
fail(peer, pr);
}
return 0;
}
if (mio->state == DELETING) {
//continue deleting map objects;
- struct map_node *mn = find_object(map, mio->delobj);
- if (!mn) {
- complete(peer, pr);
- return 0;
- }
- r = delete_object(peer, pr, mn);
- if (r < 0) {
+ r = delete_next_object(peer ,pr, map);
+ if (r != MF_PENDING){
complete(peer, pr);
}
return 0;
return 0;
}
+static int handle_dropcache(struct peerd *peer, struct peer_req *pr,
+ struct xseg_request *req)
+{
+ struct mapperd *mapper = __get_mapperd(peer);
+ struct mapper_io *mio = __get_mapper_io(pr);
+ (void) mapper;
+ (void) mio;
+ char *target = xseg_get_target(peer->xseg, pr->req);
+ if (!target) {
+ fail(peer, pr);
+ return 0;
+ }
+
+ struct map *map = find_map(mapper, target, pr->req->targetlen);
+ if (!map){
+ complete(peer, pr);
+ return 0;
+ } else if (map->flags & MF_MAP_DESTROYED) {
+ complete(peer, pr);
+ return 0;
+ } else if (map->flags & MF_MAP_NOT_READY && mio->state != DROPPING_CACHE) {
+ __xq_append_tail(&map->pending, (xqindex) pr);
+ return 0;
+ }
+
+ if (mio->state != DROPPING_CACHE) {
+ /* block all future operations on the map */
+ map->flags |= MF_MAP_DROPPING_CACHE;
+ mio->dcobj = 0;
+ mio->state = DROPPING_CACHE;
+ XSEGLOG2(&lc, I, "Map %s start dropping cache", map->volume);
+ } else {
+ XSEGLOG2(&lc, I, "Map %s continue dropping cache", map->volume);
+ }
+
+ struct map_node *mn;
+ uint64_t i;
+ for (i = mio->dcobj; i < calc_map_obj(map); i++) {
+ mn = find_object(map, i);
+ if (!mn)
+ continue;
+ mio->dcobj = i;
+ if (xq_count(&mn->pending) != 0){
+ XSEGLOG2(&lc, D, "Map %s pending dropping cache for obj idx: %llu",
+ map->volume, (unsigned long long) mn->objectidx);
+ __xq_append_tail(&mn->pending, (xqindex) pr);
+ return 0;
+ }
+ xq_free(&mn->pending);
+ XSEGLOG2(&lc, D, "Map %s dropped cache for obj idx: %llu",
+ map->volume, (unsigned long long) mn->objectidx);
+ }
+ remove_map(mapper, map);
+ //dispatch pending
+ uint64_t qsize = xq_count(&map->pending);
+ while(qsize > 0 && (i = __xq_pop_head(&map->pending)) != Noneidx){
+ qsize--;
+ struct peer_req * preq = (struct peer_req *) i;
+ my_dispatch(peer, preq, preq->req);
+ }
+ XSEGLOG2(&lc, I, "Map %s droped cache", map->volume);
+
+ //free map resources;
+ mn = find_object(map, 0);
+ if (mn)
+ free(mn);
+ xq_free(&map->pending);
+ free(map);
+
+ complete(peer, pr);
+
+ return 0;
+}
+
static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
{
struct mapperd *mapper = __get_mapperd(peer);
// case X_SNAPSHOT: handle_snap(peer, pr, req); break;
case X_INFO: handle_info(peer, pr, req); break;
case X_DELETE: handle_destroy(peer, pr, req); break;
+ case X_CLOSE: handle_dropcache(peer, pr, req); break;
default: fprintf(stderr, "mydispatch: unknown up\n"); break;
}
return 0;
}
}
+ const struct sched_param param = { .sched_priority = 99 };
+ sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
+
+
// test_map(peer);
return 0;