#include <sys/stat.h>
#include <fcntl.h>
-#define PENDING 1;
+#define MF_PENDING 1
-
-#define XSEG_MAX_TARGET_LEN (SHA256_DIGEST_SIZE << 1) // hex representation of sha256 value takes up double the sha256 size
+/* hex representation of sha256 value takes up double the sha256 size */
+#define XSEG_MAX_TARGET_LEN (SHA256_DIGEST_SIZE << 1)
#define block_size (1<<20)
-#define objectsize_in_map (1 + XSEG_MAX_TARGET_LEN) //(transparency byte + max object len)
-#define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) //magic + volume size
+#define objectsize_in_map (1 + XSEG_MAX_TARGET_LEN) /* transparency byte + max object len */
+#define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value + volume size */
#define MF_OBJECT_EXIST (1 << 0)
#define MF_OBJECT_COPYING (1 << 1)
char *magic_string = "This a magic string. Please hash me";
-unsigned char magic_sha256[SHA256_DIGEST_SIZE];
-char zero_block[SHA256_DIGEST_SIZE * 2 + 1];
+unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
+char zero_block[SHA256_DIGEST_SIZE * 2 + 1]; /* hexlified sha256 hash value of a block full of zeros */
struct map_node {
- struct xlock lock;
uint32_t flags;
- uint32_t objectlen;
uint32_t objectidx;
- char object[XSEG_MAX_TARGET_LEN + 1]; /* NULL terminated string */
- struct xq pending; /* pending peer_reqs on this object */
+ uint32_t objectlen;
+ char object[XSEG_MAX_TARGET_LEN + 1]; /* NULL terminated string */
+ struct xq pending; /* pending peer_reqs on this object */
};
-#define MF_MAP_LOADING (1 << 0)
-#define MF_MAP_ABORT (1 << 1)
+#define MF_MAP_LOADING (1 << 0)
+#define MF_MAP_DESTROYED (1 << 1)
struct map {
uint32_t flags;
uint64_t size;
uint32_t volumelen;
char volume[XSEG_MAX_TARGET_LEN + 1]; /* NULL terminated string */
- struct xlock lock;
- xhash_t *objects; // obj_index --> map_node
- struct xq pending; /* pending peer_reqs on this map */
+ xhash_t *objects; /* obj_index --> map_node */
+ struct xq pending; /* pending peer_reqs on this map */
};
struct mapperd {
xport bportno;
- struct xlock maps_lock;
xhash_t *hashmaps; // hash_function(target) --> struct map
};
struct mapper_io {
- struct xlock lock;
- volatile uint32_t copyups;
- xhash_t *copyups_nodes;
- int err;
+ volatile uint32_t copyups; /* nr of copyups pending, issued by this mapper io */
+ xhash_t *copyups_nodes; /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
+ int err; /* error flag */
};
+/*
+ * Helper functions
+ */
+
static inline struct mapperd * __get_mapperd(struct peerd *peer)
{
return (struct mapperd *) peer->priv;
return (struct mapper_io *) pr->priv;
}
-static struct map * __find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
+static inline uint64_t calc_map_obj(struct map *map)
+{
+ uint64_t nr_objs = map->size / block_size;
+ if (map->size % block_size)
+ nr_objs++;
+ return nr_objs;
+}
+
+static uint32_t calc_nr_obj(struct xseg_request *req)
+{
+ unsigned int r = 1;
+ uint64_t rem_size = req->size;
+ uint64_t obj_offset = req->offset & (block_size -1); //modulo
+ uint64_t obj_size = (rem_size > block_size) ? block_size - obj_offset : rem_size;
+ rem_size -= obj_size;
+ while (rem_size > 0) {
+ obj_size = (rem_size - block_size > 0) ? block_size : rem_size;
+ rem_size -= obj_size;
+ r++;
+ }
+
+ return r;
+}
+
+/*
+ * Maps handling functions
+ */
+
+static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
{
int r;
struct map *m = NULL;
r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
if (r < 0)
return NULL;
- print_map(m);
return m;
}
-static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
-{
- struct map *m = NULL;
-
- xlock_acquire(&mapper->maps_lock, 1);
- m = __find_map(mapper, target, targetlen);
- xlock_release(&mapper->maps_lock);
-
- return m;
-}
static int insert_map(struct mapperd *mapper, struct map *map)
{
int r = -1;
- xlock_acquire(&mapper->maps_lock, 1);
- if (__find_map(mapper, map->volume, map->volumelen)){
- printf("map found in insert map\n");
+ if (find_map(mapper, map->volume, map->volumelen)){
+ //printf("map found in insert map\n");
goto out;
}
r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
}
out:
- xlock_release(&mapper->maps_lock);
return r;
}
{
int r = -1;
- xlock_acquire(&mapper->maps_lock, 1);
//assert no pending pr on map
r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
}
out:
- xlock_release(&mapper->maps_lock);
return r;
}
xport p;
struct xseg_request *req;
struct mapperd *mapper = __get_mapperd(peer);
- //struct peerd *peer = __get_peerd(mapper);
void *dummy;
- printf("Loading map\n");
+ //printf("Loading map\n");
struct map *m = find_map(mapper, target, targetlen);
if (!m) {
goto out_map;
}
m->objects = xhash_new(3, INTEGER); //FIXME err_check;
+ if (!m->objects)
+ goto out_q;
__xq_append_tail(&m->pending, (xqindex) pr);
- xlock_release(&m->lock);
} else {
goto map_exists;
}
r = insert_map(mapper, m);
- if (r < 0) { // someone beat us (or resize error)
- xq_free(&m->pending);
- free(m);
- m = find_map(mapper, target, targetlen);
- if (!m)
- goto out_err;
- goto map_exists;
- }
- printf("Loading map: preparing req\n");
+ if (r < 0)
+ goto out_hash;
+
+ //printf("Loading map: preparing req\n");
req = xseg_get_request(peer->xseg, peer->portno, mapper->bportno, X_ALLOC);
if (!req)
goto out_unset;
r = xseg_signal(peer->xseg, p);
- printf("Loading map: request issued\n");
+ //printf("Loading map: request issued\n");
return 0;
out_unset:
xseg_put_request(peer->xseg, req, peer->portno);
out_fail:
- //remove m from maps
- //fail pending reqs; //FIXME possible race if someone got map but not appended pr to pending yet
- // probably refcount with get/put will help
remove_map(mapper, m);
xqindex idx;
- while((idx = xq_pop_head(&m->pending, 1)) != Noneidx) {
+ while((idx = __xq_pop_head(&m->pending)) != Noneidx) {
fail(peer, (struct peer_req *) idx);
}
-//out_q:
+
+out_hash:
+ xhash_free(m->objects);
+out_q:
xq_free(&m->pending);
out_map:
free(m);
return -1;
map_exists:
- xlock_acquire(&m->lock, 1);
+ //assert map loading when this is reached
if (m->flags & MF_MAP_LOADING) {
__xq_append_tail(&m->pending, (xqindex) pr);
- xlock_release(&m->lock);
}
else {
- xlock_release(&m->lock);
dispatch(peer, pr, pr->req);
}
return 0;
}
-#define MAP_LOADING 1
-static int find_or_load_map(struct peer* peer, struct peer_req *pr,
+static int find_or_load_map(struct peerd *peer, struct peer_req *pr,
char *target, uint32_t targetlen, struct map **m)
{
struct mapperd *mapper = __get_mapperd(peer);
int r;
- printf("find map or load\n");
+ //printf("find map or load\n");
*m = find_map(mapper, target, targetlen);
if (*m) {
- printf("map found\n");
- xlock_acquire(&((*m)->lock), 1);
+ //printf("map found\n");
if ((*m)->flags & MF_MAP_LOADING) {
__xq_append_tail(&(*m)->pending, (xqindex) pr);
- xlock_release(&((*m)->lock));
- printf("Map loading\n");
- return MAP_LOADING;
+ //printf("Map loading\n");
+ return MF_PENDING;
} else {
- printf("Map returned\n");
- xlock_release(&((*m)->lock));
+ //printf("Map returned\n");
return 0;
}
}
r = load_map(peer, pr, target, targetlen);
if (r < 0)
return -1; //error
- return MAP_LOADING;
+ return MF_PENDING;
}
-
+/*
+ * Object handling functions
+ */
struct map_node *find_object(struct map *map, uint64_t obj_index)
{
return r;
}
+
+/*
+ * map read/write functions
+ */
+static inline void pithosmap_to_object(struct map_node *mn, char *buf)
+{
+ int i;
+ //hexlify sha256 value
+ for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
+ sprintf(mn->object, "%02x", buf[i]);
+ }
+
+ mn->object[XSEG_MAX_TARGET_LEN] = 0;
+ mn->objectlen = strlen(mn->object);
+ mn->flags = 0;
+}
+
+static inline void map_to_object(struct map_node *mn, char *buf)
+{
+ char c = buf[0];
+ mn->flags = 0;
+ if (c)
+ mn->flags |= MF_OBJECT_EXIST;
+ memcpy(mn->object, buf+1, XSEG_MAX_TARGET_LEN);
+ mn->object[XSEG_MAX_TARGET_LEN] = 0;
+ mn->objectlen = strlen(mn->object);
+}
+
static inline void object_to_map(char* buf, struct map_node *mn)
{
buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGET_LEN - mn->objectlen); //zero out the rest of the buffer
}
-static int object_write(struct peerd *peer, struct peer_req *pr,
- struct map *map, uint64_t objidx)
+static inline void mapheader_to_map(struct map *m, char *buf)
+{
+ uint64_t pos = 0;
+ memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
+ pos += SHA256_DIGEST_SIZE;
+ memcpy(buf + pos, &m->size, sizeof(m->size));
+ pos += sizeof(m->size);
+}
+
+
+static int object_write(struct peerd *peer, struct peer_req *pr, struct map_node *mn)
{
void *dummy;
struct mapperd *mapper = __get_mapperd(peer);
- struct map_node *mn = find_object(map, objidx);
- if (!mn)
- goto out_err;
struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
mapper->bportno, X_ALLOC);
if (!req)
char *target = xseg_get_target(peer->xseg, req);
strncpy(target, mn->object, mn->objectlen);
req->size = objectsize_in_map;
- req->offset = mapheader_size + objidx * objectsize_in_map;
+ req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
req->op = X_WRITE;
char *data = xseg_get_data(peer->xseg, req);
object_to_map(data, mn);
goto out_unset;
r = xseg_signal(peer->xseg, p);
- return PENDING;
+ return MF_PENDING;
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
return -1;
}
-static inline void mapheader_to_map(struct map *m, char *buf)
-{
- uint64_t pos = 0;
- memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
- pos += SHA256_DIGEST_SIZE;
- memcpy(buf + pos, &m->size, sizeof(m->size));
- pos += sizeof(m->size);
-}
-
static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
{
void *dummy;
struct mapperd *mapper = __get_mapperd(peer);
struct map_node *mn;
- uint64_t i, pos, max_objidx = map->size / block_size;
+ uint64_t i, pos, max_objidx = calc_map_obj(map);
struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
mapper->bportno, X_ALLOC);
if (!req)
if (p == NoPort)
goto out_unset;
r = xseg_signal(peer->xseg, p);
- return PENDING;
+ return MF_PENDING;
out_unset:
xseg_get_req_data(peer->xseg, req, &dummy);
return -1;
}
+static int read_map (struct peerd *peer, struct map *map, char *buf)
+{
+ char nulls[SHA256_DIGEST_SIZE];
+ memset(nulls, 0, SHA256_DIGEST_SIZE);
+
+ int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
+ if (r) {
+ //read error;
+ return -1;
+ }
+ //type 1, our type, type 0 pithos map
+ int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
+ uint64_t pos;
+ uint64_t i, nr_objs;
+ struct map_node *map_node;
+ if (type) {
+ pos = SHA256_DIGEST_SIZE;
+ map->size = *(uint64_t *) (buf + pos);
+ pos += sizeof(uint64_t);
+ nr_objs = map->size / block_size;
+ if (map->size % block_size)
+ nr_objs++;
+ map_node = calloc(nr_objs, sizeof(struct map_node));
+ if (!map_node)
+ return -1;
+
+ for (i = 0; i < nr_objs; i++) {
+ map_node[i].objectidx = i;
+ xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
+ map_to_object(&map_node[i], buf + pos);
+ pos += objectsize_in_map;
+ r = insert_object(map, &map_node[i]); //FIXME error check
+ }
+ } else {
+ pos = 0;
+ uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
+ map_node = calloc(max_nr_objs, sizeof(struct map_node));
+ if (!map_node)
+ return -1;
+ for (i = 0; i < max_nr_objs; i++) {
+ if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
+ break;
+ map_node[i].objectidx = i;
+ xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
+ pithosmap_to_object(&map_node[i], buf + pos);
+ pos += SHA256_DIGEST_SIZE;
+ r = insert_object(map, &map_node[i]); //FIXME error check
+ }
+ map->size = i * block_size;
+ }
+ return 0;
+
+ //FIXME cleanup on error
+}
+
+/*
+ * copy up functions
+ */
+
static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
{
int r = 0;
return mn;
}
-// mn->lock held,
static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
{
struct mapperd *mapper = __get_mapperd(peer);
xport p;
struct sha256_ctx sha256ctx;
uint32_t newtargetlen;
- char new_target[XSEG_MAX_TARGET_LEN];
+ char new_target[XSEG_MAX_TARGET_LEN + 1];
unsigned char buf[SHA256_DIGEST_SIZE]; //assert sha256_digest_size(32) <= MAXTARGETLEN
- char new_object[XSEG_MAX_TARGET_LEN + 20]; //20 is an arbitrary padding
+ char new_object[XSEG_MAX_TARGET_LEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
strncpy(new_object, mn->object, mn->objectlen);
sprintf(new_object + mn->objectlen, "%u", mn->objectidx); //sprintf adds null termination
+ new_object[XSEG_MAX_TARGET_LEN + 19] = 0;
/* calculate new object name */
sha256_finish_ctx(&sha256ctx, buf);
for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
sprintf (new_target + 2*i, "%02x", buf[i]);
- newtargetlen = SHA256_DIGEST_SIZE;
+ newtargetlen = SHA256_DIGEST_SIZE * 2;
struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
goto out_put;
char *target = xseg_get_target(peer->xseg, req);
- strncpy(target, mn->object, mn->objectlen);
- target[mn->objectlen] = 0;
+ strncpy(target, new_target, newtargetlen);
struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
- strncpy(xcopy->target, new_target, newtargetlen);
- xcopy->target[newtargetlen] = 0;
+ strncpy(xcopy->target, mn->object, mn->objectlen);
+ xcopy->target[mn->objectlen] = 0;
req->offset = 0;
req->size = block_size;
}
-static inline void pithosmap_to_object(struct map_node *mn, char *buf)
-{
- int i;
- //hexlify sha256 value
- for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
- sprintf(mn->object, "%02x", buf[i]);
- }
-
- mn->object[XSEG_MAX_TARGET_LEN] = 0;
- mn->objectlen = strlen(mn->object);
- mn->flags = 0;
-}
-
-static inline void map_to_object(struct map_node *mn, char *buf)
-{
- char c = buf[0];
- mn->flags = 0;
- if (c)
- mn->flags |= MF_OBJECT_EXIST;
- memcpy(mn->object, buf+1, XSEG_MAX_TARGET_LEN);
- mn->object[XSEG_MAX_TARGET_LEN] = 0;
- mn->objectlen = strlen(mn->object);
-}
-
-
-static int read_map (struct peerd *peer, struct map *map, char *buf)
-{
- //type 1, our type, type 0 pithos map
- int r, type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
- uint64_t pos;
- uint64_t i, nr_objs;
- struct map_node *map_node;
- if (type) {
- pos = SHA256_DIGEST_SIZE;
- map->size = *(uint64_t *) (buf + pos);
- pos += sizeof(uint64_t);
- nr_objs = map->size / block_size;
- if (map->size % block_size)
- nr_objs++;
- map_node = calloc(nr_objs, sizeof(struct map_node));
- if (!map_node)
- return -1;
-
- for (i = 0; i < nr_objs; i++) {
- map_node[i].objectidx = i;
- xlock_release(&map_node[i].lock);
- xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
- map_to_object(&map_node[i], buf + pos);
- pos += objectsize_in_map;
- r = insert_object(map, &map_node[i]); //FIXME error check
- }
- } else {
- pos = 0;
- uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
- map_node = calloc(max_nr_objs, sizeof(struct map_node));
- if (!map_node)
- return -1;
- for (i = 0; i < max_nr_objs; i++) {
- if (!memcmp(buf+pos, "0000000000000000", SHA256_DIGEST_SIZE))
- break;
- map_node[i].objectidx = i;
- xlock_release(&map_node[i].lock);
- xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
- pithosmap_to_object(&map_node[i], buf + pos);
- pos += SHA256_DIGEST_SIZE;
- r = insert_object(map, &map_node[i]); //FIXME error check
- }
- map->size = i * block_size;
- }
- return 0;
-}
+/*
+ * request handling functions
+ */
static int handle_mapread(struct peerd *peer, struct peer_req *pr,
struct xseg_request *req)
char *data = xseg_get_data(peer->xseg, req);
r = read_map(peer, map, data);
if (r < 0)
- goto out_err;
+ goto out_fail;
xseg_put_request(peer->xseg, req, peer->portno);
- xlock_acquire(&map->lock, 1);
map->flags &= ~MF_MAP_LOADING;
while((idx = __xq_pop_head(&map->pending)) != Noneidx){
struct peer_req *preq = (struct peer_req *) idx;
- xlock_release(&map->lock);
dispatch(peer, preq, preq->req);
- xlock_acquire(&map->lock, 1);
}
- xlock_release(&map->lock);
return 0;
out_fail:
- xlock_acquire(&map->lock, 1);
+ xseg_put_request(peer->xseg, req, peer->portno);
map->flags &= ~MF_MAP_LOADING;
- map->flags |= MF_MAP_ABORT;
while((idx = __xq_pop_head(&map->pending)) != Noneidx){
struct peer_req *preq = (struct peer_req *) idx;
- xlock_release(&map->lock);
fail(peer, preq);
- xlock_acquire(&map->lock, 1);
}
- xlock_release(&map->lock);
remove_map(mapper, map);
free(map);
return 0;
out_err:
xseg_put_request(peer->xseg, req, peer->portno);
- goto out_fail;
+ return -1;
}
static int handle_clone(struct peerd *peer, struct peer_req *pr,
struct mapperd *mapper = __get_mapperd(peer);
struct mapper_io *mio = __get_mapper_io(pr);
(void) mio;
- printf("handle clone 1\n");
struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
if (!xclone) {
goto out_err;
}
- printf("handle clone 2\n");
struct map *map;
int r = find_or_load_map(peer, pr, xclone->target, strlen(xclone->target), &map);
if (r < 0)
goto out_err;
- else if (r == MAP_LOADING)
+ else if (r == MF_PENDING)
return 0;
+
+ if (map->flags & MF_MAP_DESTROYED) {
+ fail(peer, pr);
+ return 0;
+ }
- printf("handle clone 3\n");
//alloc and init struct map
struct map *clonemap = malloc(sizeof(struct map));
if (!clonemap) {
xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops);
if (!qidx)
goto out_err_objhash;
- xlock_release(&clonemap->lock);
clonemap->size = xclone->size;
clonemap->flags = 0;
char *target = xseg_get_target(peer->xseg, pr->req);
}
int i;
for (i = 0; i < xclone->size/block_size + 1; i++) {
- strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //FIXME copy object name from father
- map_nodes[i].objectlen = strlen(zero_block);
+ struct map_node *mn = find_object(map, i);
+ if (mn) {
+ strncpy(map_nodes[i].object, mn->object, mn->objectlen);
+ map_nodes[i].objectlen = mn->objectlen;
+ } else {
+ strncpy(map_nodes[i].object, zero_block, strlen(zero_block));
+ map_nodes[i].objectlen = strlen(zero_block);
+ }
map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
map_nodes[i].flags = 0;
map_nodes[i].objectidx = i;
- xlock_release(&map_nodes[i].lock);
- r = insert_object(map, &map_nodes[i]);
+ xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops);
+ r = insert_object(clonemap, &map_nodes[i]);
if (r < 0)
goto out_free_all;
}
//insert map
- printf("handle clone 4\n");
r = insert_map(mapper, clonemap);
if ( r < 0) {
- printf("handle clone 6\n");
goto out_free_all;
}
- printf("handle clone 5\n");
complete(peer, pr);
return 0;
return -1;
}
-static uint32_t calc_nr_obj(struct xseg_request *req)
-{
- unsigned int r = 1;
- uint64_t rem_size = req->size;
- uint64_t obj_offset = req->offset & (block_size -1); //modulo
- uint64_t obj_size = block_size - obj_offset;
- while (rem_size > 0) {
- obj_size = (rem_size - block_size > 0) ? block_size : rem_size;
- rem_size -= obj_size;
- r++;
- }
-
- return r;
-}
-
static int req2objs(struct peerd *peer, struct peer_req *pr,
struct map *map, int write)
{
strncpy(buf, target, pr->req->targetlen);
int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
if (r < 0) {
+ printf("couldn't resize req\n");
return -1;
}
target = xseg_get_target(peer->xseg, pr->req);
uint64_t rem_size = pr->req->size;
uint64_t obj_index = pr->req->offset / block_size;
uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo
- uint64_t obj_size = block_size - obj_offset;
+ uint64_t obj_size = (rem_size > block_size) ? block_size - obj_offset : rem_size;
struct map_node * mn = find_object(map, obj_index);
if (!mn) {
+ printf("coudn't find obj_index\n");
goto out_err;
}
- xlock_acquire(&mn->lock, 1);
- if (mn->flags & MF_OBJECT_COPYING)
+ if (write && mn->flags & MF_OBJECT_COPYING)
goto out_object_copying;
if (write && !(mn->flags & MF_OBJECT_EXIST)) {
//calc new_target, copy up object
r = copyup_object(peer, mn, pr);
- if (r < 0)
+ if (r < 0) {
+ printf("err_copy\n");
goto out_err_copy;
+ }
mn->flags |= MF_OBJECT_COPYING;
goto out_object_copying;
}
strncpy(reply->segs[idx].target, mn->object, XSEG_MAX_TARGET_LEN); // or strlen(mn->target ?);
reply->segs[idx].target[mn->objectlen] = 0;
- xlock_release(&mn->lock);
reply->segs[idx].offset = obj_offset;
reply->segs[idx].size = obj_size;
rem_size -= obj_size;
rem_size -= obj_size;
mn = find_object(map, obj_index);
if (!mn) {
+ printf("coudn't find obj_index\n");
goto out_err;
}
- xlock_acquire(&mn->lock, 1);
- if (mn->flags & MF_OBJECT_COPYING)
+ if (write && mn->flags & MF_OBJECT_COPYING)
goto out_object_copying;
if (write && !(mn->flags & MF_OBJECT_EXIST)) {
//calc new_target, copy up object
r = copyup_object(peer, mn, pr);
- if (r < 0)
+ if (r < 0) {
+ printf("err_copy\n");
goto out_err_copy;
+ }
mn->flags |= MF_OBJECT_COPYING;
goto out_object_copying;
}
strncpy(reply->segs[idx].target, mn->object, XSEG_MAX_TARGET_LEN); // or strlen(mn->target ?);
reply->segs[idx].target[mn->objectlen] = 0;
- xlock_release(&mn->lock);
reply->segs[idx].offset = obj_offset;
reply->segs[idx].size = obj_size;
}
return 0;
out_object_copying:
- __xq_append_tail(&mn->pending, (xqindex) pr);
- xlock_release(&mn->lock);
- return PENDING;
+ //printf("r2o mn: %lx\n", mn);
+ if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx)
+ printf("couldn't append pr to tail\n");
+ return MF_PENDING;
out_err_copy:
- xlock_release(&mn->lock);
out_err:
return -1;
}
fail(peer, pr);
return -1;
}
- else if (r == MAP_LOADING)
+ else if (r == MF_PENDING)
return 0;
+ if (map->flags & MF_MAP_DESTROYED) {
+ fail(peer, pr);
+ return 0;
+ }
+
//get_object
r = req2objs(peer, pr, map, 0);
- if (r < 0)
+ if (r < 0){
fail(peer, pr);
+ }
else if (r == 0)
complete(peer, pr);
(void) mapper;
struct mapper_io *mio = __get_mapper_io(pr);
int r = 0;
- xlock_acquire(&mio->lock, 1);
+ //printf("handle copyup reply\n");
if (req->state & XS_FAILED && !(req->state & XS_SERVED)) {
+ //printf("copy up failed\n");
mio->err = 1;
r = 1;
}
struct map_node *mn = __get_copyup_node(mio, req);
- if (!mn)
+ if (!mn){
+ //printf("copy up mn not found\n");
mio->err =1; //BUG
+ }
else {
- xlock_acquire(&mn->lock, 1);
+ //printf("mn: %lx\n", mn);
mn->flags &= ~MF_OBJECT_COPYING;
- mn->flags |= MF_OBJECT_EXIST;
if (!r) {
- struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
- strncpy(mn->object, xcopy->target, strlen(xcopy->target));
+ mn->flags |= MF_OBJECT_EXIST;
+ char *target = xseg_get_target(peer->xseg, req);
+ strncpy(mn->object, target, req->targetlen);
}
- xlock_release(&mn->lock);
}
__set_copyup_node(mio, req, NULL);
xseg_put_request(peer->xseg, req, peer->portno);
mio->copyups--;
- if (!mio->copyups) {
- if (mio->err)
- fail(peer, pr);
- else
- complete(peer, pr);
- }
- xlock_release(&mio->lock);
-
if (mn) {
//handle peer_requests waiting on copy up
xqindex idx;
- xlock_acquire(&mn->lock, 1);
- while ((idx = __xq_pop_head(&mn->pending) != Noneidx)){
- xlock_release(&mn->lock);
+ //printf("foo\n");
+ while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
+ //printf("dispatching pending\n");
struct peer_req * preq = (struct peer_req *) idx;
dispatch(peer, preq, preq->req);
- xlock_acquire(&mn->lock, 1);
}
- xlock_release(&mn->lock);
}
return 0;
fail(peer, pr);
return -1;
}
- else if (r == MAP_LOADING)
+ else if (r == MF_PENDING)
return 0;
+
+ if (map->flags & MF_MAP_DESTROYED) {
+ printf("map MF_MAP_DESTROYED req %lx\n", pr->req);
+ fail(peer, pr);
+ return 0;
+ }
+ if (mio->err) {
+ //printf("mapw failed\n");
+ fail(peer, pr);
+ return 0;
+ }
+ //printf("handle mapw\n");
-
+ mio->err = 0;
r = req2objs(peer, pr, map, 1);
- if (r < 0)
+ if (r < 0){
+ printf("req2obj returned r < 0 for req %lx\n", pr->req);
fail(peer, pr);
+ }
if (r == 0)
complete(peer, pr);
//else copyup pending, wait for pr restart
struct mapper_io *mio = __get_mapper_io(pr);
(void) mio;
char *target = xseg_get_target(peer->xseg, pr->req);
- if (!target)
- return -1;
- printf("Handle info\n");
+ if (!target) {
+ fail(peer, pr);
+ return 0;
+ }
+ //printf("Handle info\n");
struct map *map;
int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
if (r < 0) {
fail(peer, pr);
return -1;
}
- else if (r == MAP_LOADING)
+ else if (r == MF_PENDING)
+ return 0;
+ if (map->flags & MF_MAP_DESTROYED) {
+ fail(peer, pr);
return 0;
- else {
- struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
- xinfo->size = map->size;
- complete(peer, pr);
}
+
+ struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
+ xinfo->size = map->size;
+ complete(peer, pr);
+
return 0;
}
static int handle_destroy(struct peerd *peer, struct peer_req *pr,
struct xseg_request *req)
{
+ /*
+ struct map *map;
+ int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
+ if (r < 0) {
+ fail(peer, pr);
+ return -1;
+ }
+ else if (r == MF_PENDING)
+ return 0;
+ map->flags |= MF_MAP_DESTROYED;
+ */
+ //delete map block
+ //do not delete all objects
+ //remove_map(mapper, map);
+ //free(map, map_nodes, all allocated resources);
+ //complete(peer, pr);
fail(peer, pr);
return 0;
}
unsigned char buf[SHA256_DIGEST_SIZE];
char *zero;
struct sha256_ctx sha256ctx;
+ /* calculate out magic sha hash value */
sha256_init_ctx(&sha256ctx);
sha256_process_bytes(magic_string, strlen(magic_string), &sha256ctx);
sha256_finish_ctx(&sha256ctx, magic_sha256);
+ /* calculate zero block */
+ //FIXME check hash value
zero = malloc(block_size);
memset(zero, 0, block_size);
sha256_init_ctx(&sha256ctx);
sprintf(zero_block + 2*i, "%02x", buf[i]);
printf("%s \n", zero_block);
+ //FIXME error checks
struct mapperd *mapper = malloc(sizeof(struct mapperd));
- xlock_release(&mapper->maps_lock);
mapper->hashmaps = xhash_new(3, STRING);
peer->priv = mapper;
for (i = 0; i < peer->nr_ops; i++) {
struct mapper_io *mio = malloc(sizeof(struct mapper_io));
- xlock_release(&mio->lock);
mio->copyups_nodes = xhash_new(3, INTEGER);
mio->copyups = 0;
mio->err = 0;
i += 1;
continue;
}
+ /* enforce only one thread */
+ if (!strcmp(argv[i], "-t") && (i+1) < argc){
+ int t = atoi(argv[i+1]);
+ if (t != 1) {
+ printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
+ return -1;
+ }
+ i += 1;
+ continue;
+ }
}
- test_map(peer);
+ //test_map(peer);
return 0;
}
m->volume, m->volumelen, m->size, nr_objs);
uint64_t i;
struct map_node *mn;
- if (nr_objs > 1000000)
+ if (nr_objs > 1000000) //FIXME to protect against invalid volume size
return;
for (i = 0; i < nr_objs; i++) {
mn = find_object(m, i);
- if (!mn)
+ if (!mn){
+ printf("object idx [%llu] not found!\n", i);
continue;
+ }
print_obj(mn);
}
}