Add the ability to hash snapshots and create pithos compatible images.
Also, as a minor improvement add object_to_map representation to map functions.
st_cond_t req_cond;
char buf[XSEG_MAX_TARGETLEN + 1];
+
+
char * null_terminate(char *target, uint32_t targetlen)
{
if (targetlen > XSEG_MAX_TARGETLEN)
{
int r;
map->state |= MF_MAP_WRITING;
+ struct mapper_io *mio = __get_mapper_io(pr);
+
+ mio->cb = NULL;
+ mio->err = 0;
+
r = map_functions[map->version].write_map_metadata(pr, map);
if (r < 0)
goto out;
struct map_node *mn)
{
struct peerd *peer = pr->peer;
- struct mapper_io *mio = __get_mapper_io(pr);
struct map_node tmp;
char *data;
XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
return NULL;
}
-void snapshot_cb(struct peer_req *pr, struct xseg_request *req)
+#endif
+
+void hash_cb(struct peer_req *pr, struct xseg_request *req)
{
- struct peerd *peer = pr->peer;
- struct mapperd *mapper = __get_mapperd(peer);
- (void)mapper;
struct mapper_io *mio = __get_mapper_io(pr);
+ struct peerd *peer = pr->peer;
struct map_node *mn = __get_node(mio, req);
- if (!mn){
- XSEGLOG2(&lc, E, "Cannot get map node");
- goto out_err;
- }
- __set_node(mio, req, NULL);
+ struct xseg_reply_hash *xreply;
- if (req->state & XS_FAILED){
- if (req->op == X_DELETE){
- XSEGLOG2(&lc, E, "Delete req failed");
- goto out_ok;
- }
- XSEGLOG2(&lc, E, "Req failed");
- mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
- mn->flags &= ~MF_OBJECT_WRITING;
- goto out_err;
+ XSEGLOG2(&lc, I, "Callback of req %p", req);
+
+ if (!mn) {
+ XSEGLOG2(&lc, E, "Cannot get mapnode");
+ mio->err = 1;
+ goto out_nonode;
}
- if (req->op == X_WRITE) {
- char old_object_name[MAX_OBJECT_LEN + 1];
- uint32_t old_objectlen;
-
- char *target = xseg_get_target(peer->xseg, req);
- (void)target;
- //assert mn->flags & MF_OBJECT_WRITING
- mn->flags &= ~MF_OBJECT_WRITING;
- strncpy(old_object_name, mn->object, mn->objectlen);
- old_objectlen = mn->objectlen;
-
- struct map_node tmp;
- char *data = xseg_get_data(peer->xseg, req);
- map_functions[mn->map->version].read_object(&tmp, (unsigned char *)data);
- mn->flags &= ~MF_OBJECT_WRITABLE;
-
- strncpy(mn->object, tmp.object, tmp.objectlen);
- mn->object[tmp.objectlen] = 0;
- mn->objectlen = tmp.objectlen;
- XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
- //signal_mapnode since Snapshot was successfull
- signal_mapnode(mn);
+ if (req->state & XS_FAILED) {
+ mio->err = 1;
+ XSEGLOG2(&lc, E, "Request failed");
+ goto out;
+ }
- //do delete old object
- strncpy(tmp.object, old_object_name, old_objectlen);
- tmp.object[old_objectlen] = 0;
- tmp.objectlen = old_objectlen;
- tmp.flags = MF_OBJECT_WRITABLE;
- struct xseg_request *xreq = __delete_object(pr, &tmp);
- if (!xreq){
- //just a warning. Snapshot was successfull
- XSEGLOG2(&lc, W, "Cannot delete old object %s", tmp.object);
- goto out_ok;
- }
- //overwrite copyup node, since tmp is a stack dummy variable
- __set_node (mio, xreq, mn);
- XSEGLOG2(&lc, I, "Deletion of %s pending", tmp.object);
- } else if (req->op == X_SNAPSHOT) {
- //issue write_object;
- mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
- struct map *map = mn->map;
- if (!map){
- XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
- goto out_err;
- }
+ if (req->serviced != req->size) {
+ mio->err = 1;
+ XSEGLOG2(&lc, E, "Serviced != size");
+ goto out;
+ }
- /* construct a tmp map_node for writing purposes */
- //char *target = xseg_get_target(peer->xseg, req);
- struct map_node newmn = *mn;
- newmn.flags = 0;
- struct xseg_reply_snapshot *xreply;
- xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, req);
- //assert xreply->targetlen !=0
- //assert xreply->targetlen < XSEG_MAX_TARGETLEN
- //xreply->target[xreply->targetlen] = 0;
- //assert xreply->target valid
- strncpy(newmn.object, xreply->target, xreply->targetlen);
- newmn.object[req->targetlen] = 0;
- newmn.objectlen = req->targetlen;
- newmn.objectidx = mn->objectidx;
- struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
- if (!xreq){
- XSEGLOG2(&lc, E, "Object write returned error for object %s"
- "\n\t of map %s [%llu]",
- mn->object, map->volume, (unsigned long long) mn->objectidx);
- goto out_err;
- }
- mn->flags |= MF_OBJECT_WRITING;
- __set_node (mio, xreq, mn);
-
- XSEGLOG2(&lc, I, "Object %s snapshot completed. Pending writing.", mn->object);
- } else if (req->op == X_DELETE){
- //deletion of the old block completed
- XSEGLOG2(&lc, I, "Deletion of completed");
- goto out_ok;
- ;
- } else {
- //wtf??
- ;
+ xreply = (struct xseg_reply_hash *) xseg_get_data(peer->xseg, req);
+ if (xreply->targetlen != HEXLIFIED_SHA256_DIGEST_SIZE) {
+ XSEGLOG2(&lc, E, "Reply targetlen != HEXLIFIED_SHA256_DIGEST_SIZE");
+ mio->err =1;
+ goto out;
}
+ strncpy(mn->object, xreply->target, HEXLIFIED_SHA256_DIGEST_SIZE);
+ mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
+ mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
+ XSEGLOG2(&lc, D, "Received hash object %llu: %s (%p)",
+ mn->objectidx, mn->object, mn);
+ mn->flags = 0;
+
out:
+ put_mapnode(mn);
+ __set_node(mio, req, NULL);
+out_nonode:
put_request(pr, req);
+ mio->pending_reqs--;
+ signal_pr(pr);
return;
+}
-out_err:
- mio->snap_pending--;
- XSEGLOG2(&lc, D, "Mio->snap_pending: %u", mio->snap_pending);
- mio->err = 1;
- if (mn)
- signal_mapnode(mn);
- signal_pr(pr);
- goto out;
-out_ok:
- mio->snap_pending--;
- signal_pr(pr);
- goto out;
+int __hash_map(struct peer_req *pr, struct map *map, struct map *hashed_map)
+{
+ struct mapperd *mapper = __get_mapperd(pr->peer);
+ struct mapper_io *mio = __get_mapper_io(pr);
+ uint64_t i;
+ struct map_node *mn, *hashed_mn;
+ struct xseg_request *req;
+ int r;
+ mio->priv = 0;
+ for (i = 0; i < map->nr_objs; i++) {
+ mn = get_mapnode(map, i);
+ if (!mn) {
+ XSEGLOG2(&lc, E, "Cannot get mapnode %llu of map %s ",
+ "(nr_objs: %llu)", i, map->volume,
+ map->nr_objs);
+ return -1;
+ }
+ hashed_mn = get_mapnode(hashed_map, i);
+ if (!hashed_mn) {
+ XSEGLOG2(&lc, E, "Cannot get mapnode %llu of map %s ",
+ "(nr_objs: %llu)", i, hashed_map->volume,
+ hashed_map->nr_objs);
+ put_mapnode(mn);
+ return -1;
+ }
+ if (!(mn->flags & MF_OBJECT_ARCHIP)) {
+ mio->priv++;
+ strncpy(hashed_mn->object, mn->object, mn->objectlen);
+ hashed_mn->objectlen = mn->objectlen;
+ hashed_mn->object[hashed_mn->objectlen] = 0;
+ hashed_mn->flags = mn->flags;
+
+ put_mapnode(mn);
+ put_mapnode(hashed_mn);
+ continue;
+ }
+
+ req = get_request(pr, mapper->bportno, mn->object,
+ mn->objectlen, 0);
+ if (!req){
+ XSEGLOG2(&lc, E, "Cannot get request for map %s",
+ map->volume);
+ put_mapnode(mn);
+ put_mapnode(hashed_mn);
+ return -1;
+ }
+
+ req->op = X_HASH;
+ req->offset = 0;
+ req->size = map->blocksize;
+ r = __set_node(mio, req, hashed_mn);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Cannot set node");
+ put_request(pr, req);
+ put_mapnode(mn);
+ put_mapnode(hashed_mn);
+ return -1;
+ }
+
+ r = send_request(pr, req);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Cannot send request %p, pr: %p, map: %s",
+ req, pr, map->volume);
+ put_request(pr, req);
+ __set_node(mio, req, NULL);
+ put_mapnode(mn);
+ put_mapnode(hashed_mn);
+ return -1;
+ }
+ mio->pending_reqs++;
+ put_mapnode(mn);
+ }
+
+ return 0;
}
-#endif
+int hash_map(struct peer_req *pr, struct map *map, struct map *hashed_map)
+{
+ int r;
+ struct mapper_io *mio = __get_mapper_io(pr);
+
+ XSEGLOG2(&lc, I, "Hashing map %s", map->volume);
+ map->state |= MF_MAP_HASHING;
+ mio->pending_reqs = 0;
+ mio->cb = hash_cb;
+ mio->err = 0;
+
+ r = __hash_map(pr, map, hashed_map);
+ if (r < 0) {
+ mio->err = 1;
+ }
+
+ if (mio->pending_reqs) {
+ wait_on_pr(pr, mio->pending_reqs >0);
+ }
+
+ mio->cb = NULL;
+ map->state &= ~MF_MAP_HASHING;
+ if (mio->err) {
+ XSEGLOG2(&lc, E, "Hashing map %s failed", map->volume);
+ return -1;
+ } else {
+ XSEGLOG2(&lc, I, "Hashing map %s completed", map->volume);
+ return 0;
+ }
+}
return 0;
}
-void v0_object_to_map(unsigned char *data, struct map_node *mn)
+void object_to_map_v0(unsigned char *data, struct map_node *mn)
{
unhexlify(mn->object, data);
//if name == zero block, raize MF_OBJECT_ZERO
req->offset = v0_mapheader_size + mn->objectidx * v0_objectsize_in_map;
data = xseg_get_data(pr->peer->xseg, req);
- v0_object_to_map((unsigned char *)data, mn);
+ object_to_map_v0((unsigned char *)data, mn);
return req;
}
pos = 0;
for (i = 0; i < map->nr_objs; i++) {
mn = &map->objects[i];
- v0_object_to_map((unsigned char *)(data+pos), mn);
+ object_to_map_v0((unsigned char *)(data+pos), mn);
pos += v0_objectsize_in_map;
}
#define v0_objectsize_in_map (SHA256_DIGEST_SIZE)
int read_object_v0(struct map_node *mn, unsigned char *buf);
-//void v0_object_to_map(unsigned char* buf, struct map_node *mn);
+void object_to_map_v0(unsigned char* buf, struct map_node *mn);
struct xseg_request * prepare_write_object_v0(struct peer_req *pr,
struct map *map, struct map_node *mn);
//int read_map_v0(struct map *m, unsigned char * data);
/*.read_map = read_map_v0, \*/
#define map_functions_v0 { \
+ .object_to_map = object_to_map_v0, \
.read_object = read_object_v0, \
.prepare_write_object = prepare_write_object_v0,\
.load_map_data = load_map_data_v0, \
return 0;
}
-void v1_object_to_map(unsigned char* buf, struct map_node *mn)
+void object_to_map_v1(unsigned char* buf, struct map_node *mn)
{
buf[0] = (mn->flags & MF_OBJECT_WRITABLE)? 1 : 0;
//assert !(mn->flags & MF_OBJECT_ARCHIP)
req->offset = v1_mapheader_size + mn->objectidx * v1_objectsize_in_map;
data = xseg_get_data(pr->peer->xseg, req);
- v1_object_to_map((unsigned char *)data, mn);
+ object_to_map_v1((unsigned char *)data, mn);
return NULL;
}
pos = 0;
for (i = 0; i < map->nr_objs; i++) {
mn = &map->objects[i];
- v1_object_to_map((unsigned char *)(data+pos), mn);
+ object_to_map_v1((unsigned char *)(data+pos), mn);
pos += v1_objectsize_in_map;
}
#define v1_mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
int read_object_v1(struct map_node *mn, unsigned char *buf);
-//void v1_object_to_map(unsigned char* buf, struct map_node *mn);
+void object_to_map_v1(unsigned char* buf, struct map_node *mn);
struct xseg_request * prepare_write_object_v1(struct peer_req *pr,
struct map *map, struct map_node *mn);
//int read_map_v1(struct map *m, unsigned char * data);
/*.read_map = read_map_v1, \*/
#define map_functions_v1 { \
+ .object_to_map = object_to_map_v1, \
.read_object = read_object_v1, \
.prepare_write_object = prepare_write_object_v1,\
.load_map_data = load_map_data_v1, \
/* v2 functions */
-int initialize_map_objects(struct map *map)
-{
- uint64_t i;
- struct map_node *map_node = map->objects;
-
- if (!map_node)
- return -1;
-
- for (i = 0; i < map->nr_objs; i++) {
- map_node[i].map = map;
- map_node[i].objectidx = i;
- map_node[i].waiters = 0;
- map_node[i].state = 0;
- map_node[i].ref = 1;
- map_node[i].cond = st_cond_new(); //FIXME err check;
- }
- return 0;
-}
-
-uint32_t get_map_block_name(char *target, struct map *map, uint64_t block_id)
+static uint32_t get_map_block_name(char *target, struct map *map, uint64_t block_id)
{
uint32_t targetlen;
char buf[sizeof(block_id)*2 + 1];
uint32_t len;
};
-struct obj2chunk get_chunk(struct map *map, uint64_t start, uint64_t nr)
+static struct obj2chunk get_chunk(struct map *map, uint64_t start, uint64_t nr)
{
struct obj2chunk ret;
uint64_t nr_objs_per_block, nr_objs_per_chunk, nr_chunks_per_block;
return 0;
}
-void v2_object_to_map(unsigned char* buf, struct map_node *mn)
+void object_to_map_v2(unsigned char* buf, struct map_node *mn)
{
uint32_t *objectlen;
uint32_t len;
pos = 0;
for (k = o2c.start_obj; k < limit; k++) {
mn = &map->objects[k];
- v2_object_to_map((unsigned char *)(data+pos), mn);
+ object_to_map_v2((unsigned char *)(data+pos), mn);
pos += v2_objectsize_in_map;
}
return NULL;
data = xseg_get_data(peer->xseg, req);
- v2_object_to_map((unsigned char *)(data), mn);
+ object_to_map_v2((unsigned char *)(data), mn);
return req;
}
}
}
+ map_node = map->objects;
+
for (i = start; i < nr; i++) {
r = read_object_v2(&map_node[i], data+pos);
if (r < 0) {
pos = 0;
for (k = start; k < map->nr_objs && k < limit; k++) {
mn = &map->objects[k];
- v2_object_to_map((unsigned char *)(data+pos), mn);
+ object_to_map_v2((unsigned char *)(data+pos), mn);
pos += v2_objectsize_in_map;
}
sizeof(uint64_t))
int read_object_v2(struct map_node *mn, unsigned char *buf);
-//void v2_object_to_map(unsigned char* buf, struct map_node *mn);
+void object_to_map_v2(unsigned char* buf, struct map_node *mn);
struct xseg_request * prepare_write_object_v2(struct peer_req *pr,
struct map *map, struct map_node *mn);
//int read_map_v2(struct map *m, unsigned char * data);
/*.read_map = read_map_v2, \*/
#define map_functions_v2 { \
.read_object = read_object_v2, \
+ .object_to_map = object_to_map_v2, \
.prepare_write_object = prepare_write_object_v2,\
.load_map_data = load_map_data_v2, \
.write_map_metadata = write_map_metadata_v2, \
*/
struct map_functions {
+ void (*object_to_map)(unsigned char *buf, struct map_node *mn);
int (*read_object)(struct map_node *mn, unsigned char *buf);
struct xseg_request * (*prepare_write_object)(struct peer_req *pr,
struct map *map, struct map_node *mn);
return r;
}
-static inline struct map_node * get_mapnode(struct map *map, uint64_t index)
+inline struct map_node * get_mapnode(struct map *map, uint64_t index)
{
struct map_node *mn;
if (index >= map->nr_objs) {
}
mn = &map->objects[index];
mn->ref++;
+ XSEGLOG2(&lc, D, "mapnode %p: ref: %u", mn, mn->ref);
return mn;
}
-static inline void put_mapnode(struct map_node *mn)
+inline void put_mapnode(struct map_node *mn)
{
mn->ref--;
+ XSEGLOG2(&lc, D, "mapnode %p: ref: %u", mn, mn->ref);
if (!mn->ref){
//clean up mn
st_cond_destroy(mn->cond);
}
}
+int initialize_map_objects(struct map *map)
+{
+ uint64_t i;
+ struct map_node *map_node = map->objects;
+
+ if (!map_node)
+ return -1;
+
+ for (i = 0; i < map->nr_objs; i++) {
+ map_node[i].map = map;
+ map_node[i].objectidx = i;
+ map_node[i].waiters = 0;
+ map_node[i].state = 0;
+ map_node[i].ref = 1;
+ map_node[i].cond = st_cond_new(); //FIXME err check;
+ }
+ return 0;
+}
+
+
+
static inline void __get_map(struct map *map)
{
map->ref++;
return dropcache(pr, map);
}
-/* do_hash:
- *
- * send hash, overwrite object to the map, and do one write for the whole map.
- */
-#if 0
static int do_hash(struct peer_req *pr, struct map *map)
{
- uint64_t i;
+ int r;
struct peerd *peer = pr->peer;
- struct mapper_io *mio = __get_mapper_io(pr);
+ uint64_t i, bufsize;
+ struct map *hashed_map;
+ unsigned char sha[SHA256_DIGEST_SIZE];
+ unsigned char *buf = NULL;
+ char newvolumename[MAX_VOLUME_LEN];
+ uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
+ uint64_t pos = 0;
+ char targetbuf[XSEG_MAX_TARGETLEN];
+ char *target;
+ struct xseg_reply_hash *xreply;
struct map_node *mn;
- struct xseg_request *req;
- if (!(map->state & MF_MAP_EXCLUSIVE)){
- XSEGLOG2(&lc, E, "Map was not opened exclusively");
+ if (!(map->flags & MF_MAP_READONLY)) {
+ XSEGLOG2(&lc, E, "Cannot hash live volumes");
return -1;
}
- XSEGLOG2(&lc, I, "Starting snapshot for map %s", map->volume);
- map->state |= MF_MAP_SNAPSHOTTING;
-
- uint64_t nr_obj = calc_map_obj(map);
- mio->cb = snapshot_cb;
- mio->snap_pending = 0;
- mio->err = 0;
- for (i = 0; i < nr_obj; i++){
- /* throttle pending snapshots
- * this should be nr_ops of the blocker, but since we don't know
- * that, we assume based on our own nr_ops
- */
- if (mio->snap_pending >= peer->nr_ops)
- wait_on_pr(pr, mio->snap_pending >= peer->nr_ops);
+ XSEGLOG2(&lc, I, "Hashing map %s", map->volume);
+ /* prepare hashed_map holder */
+ hashed_map = create_map("", 0, 0);
+ if (!hashed_map) {
+ XSEGLOG2(&lc, E, "Cannot create hashed map");
+ return -1;
+ }
- mn = get_mapnode(map, i);
- if (!mn)
- //warning?
- continue;
- if (!(mn->flags & MF_OBJECT_WRITABLE)){
- put_mapnode(mn);
- continue;
- }
- // make sure all pending operations on all objects are completed
- if (mn->flags & MF_OBJECT_NOT_READY)
- wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
+ /* set map metadata */
+ hashed_map->size = map->size;
+ hashed_map->nr_objs = map->nr_objs;
+ hashed_map->flags = MF_MAP_READONLY;
+ hashed_map->blocksize = MAPPER_DEFAULT_BLOCKSIZE; /* FIXME, this should be PITHOS_BLOCK_SIZE right? */
- /* TODO will this ever happen?? */
- if (mn->flags & MF_OBJECT_DESTROYED){
- put_mapnode(mn);
- continue;
- }
-
- req = __snapshot_object(pr, mn);
- if (!req){
- mio->err = 1;
- put_mapnode(mn);
- break;
- }
- mio->snap_pending++;
- /* do not put_mapnode here. cb does that */
+ hashed_map->objects = calloc(map->nr_objs, sizeof(struct map_node));
+ if (!hashed_map->objects) {
+ XSEGLOG2(&lc, E, "Cannot allocate memory for %llu nr_objs",
+ hashed_map->nr_objs);
+ r = -1;
+ goto out;
}
- if (mio->snap_pending > 0)
- wait_on_pr(pr, mio->snap_pending > 0);
- mio->cb = NULL;
+ r = initialize_map_objects(hashed_map);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Cannot initialize hashed_map objects");
+ goto out;
+ }
- if (mio->err)
- goto out_err;
+ r = hash_map(pr, map, hashed_map);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Cannot hash map %s", map->volume);
+ goto out;
+ }
- /* calculate name of snapshot */
- struct map tmp_map = *map;
- unsigned char sha[SHA256_DIGEST_SIZE];
- unsigned char *buf = malloc(MAPPER_DEFAULT_BLOCKSIZE);
- char newvolumename[MAX_VOLUME_LEN];
- uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
- uint64_t pos = 0;
- uint64_t max_objidx = calc_map_obj(map);
- int r;
+ bufsize = hashed_map->nr_objs * v0_objectsize_in_map;
- for (i = 0; i < max_objidx; i++) {
- mn = find_object(map, i);
+ buf = malloc(bufsize);
+ if (!buf) {
+ XSEGLOG2(&lc, E, "Cannot allocate merkle_hash buffer of %llu bytes",
+ bufsize);
+ goto out;
+ }
+ for (i = 0; i < hashed_map->nr_objs; i++) {
+ mn = get_mapnode(hashed_map, i);
if (!mn){
- XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
- (unsigned long long) i, map->volume);
- goto out_err;
+ XSEGLOG2(&lc, E, "Cannot get object %llu for map %s",
+ i, hashed_map->volume);
+ goto out;
}
- v0_object_to_map(mn, buf+pos);
+ map_functions[0].object_to_map(buf+pos, mn);
pos += v0_objectsize_in_map;
+ put_mapnode(mn);
}
-// SHA256(buf, pos, sha);
+
merkle_hash(buf, pos, sha);
- hexlify(sha, newvolumename);
- strncpy(tmp_map.volume, newvolumename, newvolumenamelen);
- tmp_map.volumelen = newvolumenamelen;
- free(buf);
- tmp_map.version = 0; // set volume version to pithos image
-
- /* write the map of the Snapshot */
- r = write_map(pr, &tmp_map);
- if (r < 0)
- goto out_err;
- char targetbuf[XSEG_MAX_TARGETLEN];
- char *target = xseg_get_target(peer->xseg, pr->req);
+ hexlify(sha, SHA256_DIGEST_SIZE, newvolumename);
+ strncpy(hashed_map->volume, newvolumename, newvolumenamelen);
+ hashed_map->volume[newvolumenamelen] = 0;
+ hashed_map->volumelen = newvolumenamelen;
+
+ /* write the hashed_map */
+ r = write_map(pr, hashed_map);
+ if (r < 0) {
+ XSEGLOG2(&lc, E, "Cannot write hashed_map %s", hashed_map->volume);
+ goto out;
+ }
+
+ /* Resize request to fit xhash reply */
+ target = xseg_get_target(peer->xseg, pr->req);
strncpy(targetbuf, target, pr->req->targetlen);
+
r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
- sizeof(struct xseg_reply_snapshot));
+ sizeof(struct xseg_reply_hash));
if (r < 0){
XSEGLOG2(&lc, E, "Cannot resize request");
- goto out_err;
+ goto out;
}
+
target = xseg_get_target(peer->xseg, pr->req);
strncpy(target, targetbuf, pr->req->targetlen);
- struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *)
- xseg_get_data(peer->xseg, pr->req);
+ /* Put the target of the hashed_map on the reply */
+ xreply = (struct xseg_reply_hash *) xseg_get_data(peer->xseg, pr->req);
strncpy(xreply->target, newvolumename, newvolumenamelen);
xreply->targetlen = newvolumenamelen;
- map->state &= ~MF_MAP_SNAPSHOTTING;
- XSEGLOG2(&lc, I, "Snapshot for map %s completed", map->volume);
- return 0;
-out_err:
- map->state &= ~MF_MAP_SNAPSHOTTING;
- XSEGLOG2(&lc, E, "Snapshot for map %s failed", map->volume);
- return -1;
+out:
+ if (buf)
+ free(buf);
+ put_map(hashed_map);
+ if (r < 0) {
+ return -1;
+ } else {
+ return 0;
+ }
}
-#endif
-
static int do_snapshot(struct peer_req *pr, struct map *map)
{
return NULL;
}
+void * handle_hash(struct peer_req *pr)
+{
+ struct peerd *peer = pr->peer;
+ char *target = xseg_get_target(peer->xseg, pr->req);
+ /* Do not request exclusive access. Since we are hashing only shapshots
+ * which are read only, there is no need for locking
+ */
+ int r = map_action(do_hash, pr, target, pr->req->targetlen,
+ MF_ARCHIP|MF_LOAD);
+ if (r < 0)
+ fail(peer, pr);
+ else
+ complete(peer, pr);
+ ta--;
+ return NULL;
+}
+
int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
struct xseg_request *req)
{
case X_DELETE: action = handle_destroy; break;
case X_OPEN: action = handle_open; break;
case X_CLOSE: action = handle_close; break;
- default: fprintf(stderr, "mydispatch: unknown up\n"); break;
+ case X_HASH: action = handle_hash; break;
+ default: fprintf(stderr, "mydispatch: unknown op\n"); break;
}
if (action){
ta++;
//#define MF_MAP_DELETED (1 << 8)
#define MF_MAP_SNAPSHOTTING (1 << 9)
#define MF_MAP_SERIALIZING (1 << 10)
+#define MF_MAP_HASHING (1 << 11)
#define MF_MAP_NOT_READY (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
MF_MAP_DROPPING_CACHE|MF_MAP_OPENING| \
- MF_MAP_SNAPSHOTTING|MF_MAP_SERIALIZING)
+ MF_MAP_SNAPSHOTTING|MF_MAP_SERIALIZING| \
+ MF_MAP_HASHING)
struct map {
uint32_t version;
void put_request(struct peer_req *pr, struct xseg_request *req);
struct xseg_request * __load_map_metadata(struct peer_req *pr, struct map *map);
int load_map_metadata(struct peer_req *pr, struct map *map);
-
+int initialize_map_objects(struct map *map);
+int hash_map(struct peer_req *pr, struct map *map, struct map *hashed_map);
+struct map_node * get_mapnode(struct map *map, uint64_t objindex);
+void put_mapnode(struct map_node *mn);
#endif /* end MAPPER_H */