#include <fcntl.h>
#include <gcrypt.h>
#include <errno.h>
+#include <sched.h>
+#include <sys/syscall.h>
GCRY_THREAD_OPTION_PTHREAD_IMPL;
//assert targetlen <= XSEG_MAX_TARGETLEN
strncpy(buf, target, targetlen);
buf[targetlen] = 0;
- XSEGLOG2(&lc, E, "looking up map %s, len %u", buf, targetlen);
+ XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
if (r < 0)
return NULL;
goto out;
}
- XSEGLOG2(&lc, E, "Inserting map %s, len: %d", map->volume, strlen(map->volume));
+ XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)",
+ map->volume, strlen(map->volume), (unsigned long) map);
r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
- if (r == -XHASH_ERESIZE) {
- xhashidx shift = xhash_grow_size_shift(map->objects);
+ while (r == -XHASH_ERESIZE) {
+ xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
if (!new_hashmap){
XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
//assert no pending pr on map
r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
- if (r == -XHASH_ERESIZE) {
- xhashidx shift = xhash_shrink_size_shift(map->objects);
+ while (r == -XHASH_ERESIZE) {
+ xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
if (!new_hashmap){
XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
goto out_hash;
- req = xseg_get_request(peer->xseg, peer->portno, mapper->mbportno, X_ALLOC);
+ req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
if (!req){
XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
m->volume);
m->volume);
goto out_put;
}
- p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort){
XSEGLOG2(&lc, E, "Cannot submit request for map %s",
m->volume);
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_fail:
remove_map(mapper, m);
int r;
*m = find_map(mapper, target, targetlen);
if (*m) {
+ XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
if ((*m)->flags & MF_MAP_NOT_READY) {
__xq_append_tail(&(*m)->pending, (xqindex) pr);
XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
{
void *dummy;
struct mapperd *mapper = __get_mapperd(peer);
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->mbportno, X_ALLOC);
if (!req){
XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
mn->object, map->volume, (unsigned long long) mn->objectidx);
goto out_put;
}
- xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort){
XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
"(Map: %s [%llu]",
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
"(Map: %s [%llu]",
struct mapperd *mapper = __get_mapperd(peer);
struct map_node *mn;
uint64_t i, pos, max_objidx = calc_map_obj(map);
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->mbportno, X_ALLOC);
if (!req){
XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
map->volume);
goto out_put;
}
- xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort){
XSEGLOG2(&lc, E, "Cannot submit request for map %s",
map->volume);
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
return -1;
if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE))
goto copyup_zeroblock;
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->bportno, X_ALLOC);
if (!req)
goto out_err;
if (r<0)
goto out_put;
r = __set_copyup_node(mio, req, mn);
- p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort) {
goto out_unset;
}
r = __set_copyup_node(mio, req, NULL);
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
return -1;
if (r < 0)
goto out_fail;
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
map->flags &= ~MF_MAP_LOADING;
- print_map(map);
XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
- while((idx = __xq_pop_head(&map->pending)) != Noneidx){
+ uint64_t qsize = xq_count(&map->pending);
+ while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+ qsize--;
struct peer_req *preq = (struct peer_req *) idx;
my_dispatch(peer, preq, preq->req);
}
out_fail:
XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
map->flags &= ~MF_MAP_LOADING;
while((idx = __xq_pop_head(&map->pending)) != Noneidx){
struct peer_req *preq = (struct peer_req *) idx;
strncpy(buf, target, req->targetlen);
buf[req->targetlen] = 0;
XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
goto out_fail;
}
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
map->flags &= ~MF_MAP_WRITING;
XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
- while((idx = __xq_pop_head(&map->pending)) != Noneidx){
+ uint64_t qsize = xq_count(&map->pending);
+ while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+ qsize--;
struct peer_req *preq = (struct peer_req *) idx;
my_dispatch(peer, preq, preq->req);
}
out_fail:
XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
map->flags &= ~MF_MAP_WRITING;
while((idx = __xq_pop_head(&map->pending)) != Noneidx){
struct peer_req *preq = (struct peer_req *) idx;
strncpy(buf, target, req->targetlen);
buf[req->targetlen] = 0;
XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
goto out_free_all;
}
}
- print_map(clonemap);
//insert map
r = insert_map(mapper, clonemap);
if ( r < 0) {
// block_size);
strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
reply->segs[idx].targetlen = mn->objectlen;
- reply->segs[idx].target[mn->objectlen] = 0;
reply->segs[idx].offset = obj_offset;
reply->segs[idx].size = obj_size;
// XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
}
strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
reply->segs[idx].targetlen = mn->objectlen;
- reply->segs[idx].target[mn->objectlen] = 0;
reply->segs[idx].offset = obj_offset;
reply->segs[idx].size = obj_size;
// XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
goto out_fail;
}
mn->flags |= MF_OBJECT_WRITING;
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
return 0;
out_fail:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
__set_copyup_node(mio, req, NULL);
while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
struct peer_req * preq = (struct peer_req *) idx;
strncpy(mn->object, tmp.object, tmp.objectlen);
mn->object[tmp.objectlen] = 0;
mn->objectlen = tmp.objectlen;
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
- print_map(mn->map);
XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
- while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
+ uint64_t qsize = xq_count(&mn->pending);
+ while(qsize > 0 && (idx = __xq_pop_head(&mn->pending)) != Noneidx){
+ qsize--;
struct peer_req * preq = (struct peer_req *) idx;
my_dispatch(peer, preq, preq->req);
}
out_fail:
XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
struct peer_req *preq = (struct peer_req *) idx;
fail(peer, preq);
out_err:
XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
return MF_PENDING;
}
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->bportno, X_ALLOC);
if (!req)
goto out_err;
if (r < 0)
goto out_put;
__set_copyup_node(mio, req, mn);
- xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort)
goto out_unset;
r = xseg_signal(peer->xseg, p);
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
return -1;
//free map_node_resources
mn->flags &= ~MF_OBJECT_DELETING;
xq_free(&mn->pending);
-
+
+ mio->delobj++;
r = delete_next_object(peer, pr, map);
if (r != MF_PENDING){
/* if there is no next object to delete, remove the map block
map->flags |= MF_MAP_DESTROYED;
XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
//make all pending requests on map to fail
- while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
+ uint64_t qsize = xq_count(&map->pending);
+ while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+ qsize--;
struct peer_req * preq = (struct peer_req *) idx;
my_dispatch(peer, preq, preq->req);
}
void *dummy;
struct mapperd *mapper = __get_mapperd(peer);
struct mapper_io *mio = __get_mapper_io(pr);
- struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
+ struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
mapper->mbportno, X_ALLOC);
if (!req)
goto out_err;
if (r < 0)
goto out_put;
__set_copyup_node(mio, req, NULL);
- xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
+ xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
if (p == NoPort)
goto out_unset;
r = xseg_signal(peer->xseg, p);
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
out_put:
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
out_err:
XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
return -1;
map->flags |= MF_MAP_DESTROYED;
XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
//make all pending requests on map to fail
- while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
+ uint64_t qsize = xq_count(&map->pending);
+ while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
+ qsize--;
struct peer_req * preq = (struct peer_req *) idx;
my_dispatch(peer, preq, preq->req);
}
//map block delete
map = find_map(mapper, target, req->targetlen);
if (!map) {
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
handle_map_delete(peer, pr, map, err);
//object delete
map = mn->map;
if (!map) {
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return -1;
}
handle_object_delete(peer, pr, mn, err);
}
- xseg_put_request(peer->xseg, req, peer->portno);
+ xseg_put_request(peer->xseg, req, pr->portno);
return 0;
}
struct mapper_io *mio = __get_mapper_io(pr);
(void) mapper;
int r;
+ char buf[XSEG_MAX_TARGETLEN+1];
+ char *target = xseg_get_target(peer->xseg, pr->req);
+
+ strncpy(buf, target, pr->req->targetlen);
+ buf[req->targetlen] = 0;
+ XSEGLOG2(&lc, D, "Handle destroy pr: %lx, pr->req: %lx, req: %lx",
+ (unsigned long) pr, (unsigned long) pr->req,
+ (unsigned long) req);
+ XSEGLOG2(&lc, D, "target: %s (%u)", buf, strlen(buf));
if (pr->req != req && req->op == X_DELETE) {
//assert mio->state == DELETING
r = handle_delete(peer, pr, req);
}
struct map *map;
- char *target = xseg_get_target(peer->xseg, pr->req);
r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
if (r < 0) {
fail(peer, pr);
map->flags |= MF_MAP_DROPPING_CACHE;
mio->dcobj = 0;
mio->state = DROPPING_CACHE;
+ XSEGLOG2(&lc, I, "Map %s start dropping cache", map->volume);
+ } else {
+ XSEGLOG2(&lc, I, "Map %s continue dropping cache", map->volume);
}
struct map_node *mn;
continue;
mio->dcobj = i;
if (xq_count(&mn->pending) != 0){
+ XSEGLOG2(&lc, D, "Map %s pending dropping cache for obj idx: %llu",
+ map->volume, (unsigned long long) mn->objectidx);
__xq_append_tail(&mn->pending, (xqindex) pr);
return 0;
}
xq_free(&mn->pending);
+ XSEGLOG2(&lc, D, "Map %s dropped cache for obj idx: %llu",
+ map->volume, (unsigned long long) mn->objectidx);
}
remove_map(mapper, map);
//dispatch pending
- while ((i = __xq_pop_head(&map->pending)) != Noneidx){
+ uint64_t qsize = xq_count(&map->pending);
+ while(qsize > 0 && (i = __xq_pop_head(&map->pending)) != Noneidx){
+ qsize--;
struct peer_req * preq = (struct peer_req *) i;
my_dispatch(peer, preq, preq->req);
}
+ XSEGLOG2(&lc, I, "Map %s droped cache", map->volume);
//free map resources;
mn = find_object(map, 0);
}
}
+ const struct sched_param param = { .sched_priority = 99 };
+ sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
+
+
// test_map(peer);
return 0;