mapper: Add hash support for snapshots.
authorFilippos Giannakos <philipgian@grnet.gr>
Wed, 17 Jul 2013 12:35:47 +0000 (15:35 +0300)
committerFilippos Giannakos <philipgian@grnet.gr>
Wed, 17 Jul 2013 12:35:47 +0000 (15:35 +0300)
Add the ability to hash snapshots and create pithos compatible images.

Also, as a minor improvement add object_to_map representation to map functions.

xseg/peers/user/mapper-handling.c
xseg/peers/user/mapper-version0.c
xseg/peers/user/mapper-version0.h
xseg/peers/user/mapper-version1.c
xseg/peers/user/mapper-version1.h
xseg/peers/user/mapper-version2.c
xseg/peers/user/mapper-version2.h
xseg/peers/user/mapper-versions.h
xseg/peers/user/mapper.c
xseg/peers/user/mapper.h

index dcd127a..17a764e 100644 (file)
@@ -49,6 +49,8 @@ static uint32_t waiters_for_req = 0;
 st_cond_t req_cond;
 char buf[XSEG_MAX_TARGETLEN + 1];
 
+
+
 char * null_terminate(char *target, uint32_t targetlen)
 {
        if (targetlen > XSEG_MAX_TARGETLEN)
@@ -346,6 +348,11 @@ int write_map(struct peer_req* pr, struct map *map)
 {
        int r;
        map->state |= MF_MAP_WRITING;
+       struct mapper_io *mio = __get_mapper_io(pr);
+
+       mio->cb = NULL;
+       mio->err = 0;
+
        r = map_functions[map->version].write_map_metadata(pr, map);
        if (r < 0)
                goto out;
@@ -722,7 +729,6 @@ static int __copyup_write_cb(struct peer_req *pr, struct xseg_request *req,
                struct map_node *mn)
 {
        struct peerd *peer = pr->peer;
-       struct mapper_io *mio = __get_mapper_io(pr);
        struct map_node tmp;
        char *data;
 
@@ -937,131 +943,166 @@ out_err:
        XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
        return  NULL;
 }
-void snapshot_cb(struct peer_req *pr, struct xseg_request *req)
+#endif
+
+void hash_cb(struct peer_req *pr, struct xseg_request *req)
 {
-       struct peerd *peer = pr->peer;
-       struct mapperd *mapper = __get_mapperd(peer);
-       (void)mapper;
        struct mapper_io *mio = __get_mapper_io(pr);
+       struct peerd *peer = pr->peer;
        struct map_node *mn = __get_node(mio, req);
-       if (!mn){
-               XSEGLOG2(&lc, E, "Cannot get map node");
-               goto out_err;
-       }
-       __set_node(mio, req, NULL);
+       struct xseg_reply_hash *xreply;
 
-       if (req->state & XS_FAILED){
-               if (req->op == X_DELETE){
-                       XSEGLOG2(&lc, E, "Delete req failed");
-                       goto out_ok;
-               }
-               XSEGLOG2(&lc, E, "Req failed");
-               mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
-               mn->flags &= ~MF_OBJECT_WRITING;
-               goto out_err;
+       XSEGLOG2(&lc, I, "Callback of req %p", req);
+
+       if (!mn) {
+               XSEGLOG2(&lc, E, "Cannot get mapnode");
+               mio->err = 1;
+               goto out_nonode;
        }
 
-       if (req->op == X_WRITE) {
-               char old_object_name[MAX_OBJECT_LEN + 1];
-               uint32_t old_objectlen;
-
-               char *target = xseg_get_target(peer->xseg, req);
-               (void)target;
-               //assert mn->flags & MF_OBJECT_WRITING
-               mn->flags &= ~MF_OBJECT_WRITING;
-               strncpy(old_object_name, mn->object, mn->objectlen);
-               old_objectlen = mn->objectlen;
-
-               struct map_node tmp;
-               char *data = xseg_get_data(peer->xseg, req);
-               map_functions[mn->map->version].read_object(&tmp, (unsigned char *)data);
-               mn->flags &= ~MF_OBJECT_WRITABLE;
-
-               strncpy(mn->object, tmp.object, tmp.objectlen);
-               mn->object[tmp.objectlen] = 0;
-               mn->objectlen = tmp.objectlen;
-               XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
-               //signal_mapnode since Snapshot was successfull
-               signal_mapnode(mn);
+       if (req->state & XS_FAILED) {
+               mio->err = 1;
+               XSEGLOG2(&lc, E, "Request failed");
+               goto out;
+       }
 
-               //do delete old object
-               strncpy(tmp.object, old_object_name, old_objectlen);
-               tmp.object[old_objectlen] = 0;
-               tmp.objectlen = old_objectlen;
-               tmp.flags = MF_OBJECT_WRITABLE;
-               struct xseg_request *xreq = __delete_object(pr, &tmp);
-               if (!xreq){
-                       //just a warning. Snapshot was successfull
-                       XSEGLOG2(&lc, W, "Cannot delete old object %s", tmp.object);
-                       goto out_ok;
-               }
-               //overwrite copyup node, since tmp is a stack dummy variable
-               __set_node (mio, xreq, mn);
-               XSEGLOG2(&lc, I, "Deletion of %s pending", tmp.object);
-       } else if (req->op == X_SNAPSHOT) {
-               //issue write_object;
-               mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
-               struct map *map = mn->map;
-               if (!map){
-                       XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
-                       goto out_err;
-               }
+       if (req->serviced != req->size) {
+               mio->err = 1;
+               XSEGLOG2(&lc, E, "Serviced != size");
+               goto out;
+       }
 
-               /* construct a tmp map_node for writing purposes */
-               //char *target = xseg_get_target(peer->xseg, req);
-               struct map_node newmn = *mn;
-               newmn.flags = 0;
-               struct xseg_reply_snapshot *xreply;
-               xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, req);
-               //assert xreply->targetlen !=0
-               //assert xreply->targetlen < XSEG_MAX_TARGETLEN
-               //xreply->target[xreply->targetlen] = 0;
-               //assert xreply->target valid
-               strncpy(newmn.object, xreply->target, xreply->targetlen);
-               newmn.object[req->targetlen] = 0;
-               newmn.objectlen = req->targetlen;
-               newmn.objectidx = mn->objectidx;
-               struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
-               if (!xreq){
-                       XSEGLOG2(&lc, E, "Object write returned error for object %s"
-                                       "\n\t of map %s [%llu]",
-                                       mn->object, map->volume, (unsigned long long) mn->objectidx);
-                       goto out_err;
-               }
-               mn->flags |= MF_OBJECT_WRITING;
-               __set_node (mio, xreq, mn);
-
-               XSEGLOG2(&lc, I, "Object %s snapshot completed. Pending writing.", mn->object);
-       } else if (req->op == X_DELETE){
-               //deletion of the old block completed
-               XSEGLOG2(&lc, I, "Deletion of completed");
-               goto out_ok;
-               ;
-       } else {
-               //wtf??
-               ;
+       xreply = (struct xseg_reply_hash *) xseg_get_data(peer->xseg, req);
+       if (xreply->targetlen != HEXLIFIED_SHA256_DIGEST_SIZE) {
+               XSEGLOG2(&lc, E, "Reply targetlen != HEXLIFIED_SHA256_DIGEST_SIZE");
+               mio->err =1;
+               goto out;
        }
 
+       strncpy(mn->object, xreply->target, HEXLIFIED_SHA256_DIGEST_SIZE);
+       mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
+       mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
+       XSEGLOG2(&lc, D, "Received hash object %llu: %s (%p)",
+                       mn->objectidx, mn->object, mn);
+       mn->flags = 0;
+
 out:
+       put_mapnode(mn);
+       __set_node(mio, req, NULL);
+out_nonode:
        put_request(pr, req);
+       mio->pending_reqs--;
+       signal_pr(pr);
        return;
+}
 
-out_err:
-       mio->snap_pending--;
-       XSEGLOG2(&lc, D, "Mio->snap_pending: %u", mio->snap_pending);
-       mio->err = 1;
-       if (mn)
-               signal_mapnode(mn);
-       signal_pr(pr);
-       goto out;
 
-out_ok:
-       mio->snap_pending--;
-       signal_pr(pr);
-       goto out;
+int __hash_map(struct peer_req *pr, struct map *map, struct map *hashed_map)
+{
+       struct mapperd *mapper = __get_mapperd(pr->peer);
+       struct mapper_io *mio = __get_mapper_io(pr);
+       uint64_t i;
+       struct map_node *mn, *hashed_mn;
+       struct xseg_request *req;
+       int r;
 
+       mio->priv = 0;
 
+       for (i = 0; i < map->nr_objs; i++) {
+               mn = get_mapnode(map, i);
+               if (!mn) {
+                       XSEGLOG2(&lc, E, "Cannot get mapnode %llu of map %s ",
+                                       "(nr_objs: %llu)", i, map->volume,
+                                       map->nr_objs);
+                       return -1;
+               }
+               hashed_mn = get_mapnode(hashed_map, i);
+               if (!hashed_mn) {
+                       XSEGLOG2(&lc, E, "Cannot get mapnode %llu of map %s ",
+                                       "(nr_objs: %llu)", i, hashed_map->volume,
+                                       hashed_map->nr_objs);
+                       put_mapnode(mn);
+                       return -1;
+               }
+               if (!(mn->flags & MF_OBJECT_ARCHIP)) {
+                       mio->priv++;
+                       strncpy(hashed_mn->object, mn->object, mn->objectlen);
+                       hashed_mn->objectlen = mn->objectlen;
+                       hashed_mn->object[hashed_mn->objectlen] = 0;
+                       hashed_mn->flags = mn->flags;
+
+                       put_mapnode(mn);
+                       put_mapnode(hashed_mn);
+                       continue;
+               }
+
+               req = get_request(pr, mapper->bportno, mn->object,
+                               mn->objectlen, 0);
+               if (!req){
+                       XSEGLOG2(&lc, E, "Cannot get request for map %s",
+                                       map->volume);
+                       put_mapnode(mn);
+                       put_mapnode(hashed_mn);
+                       return -1;
+               }
+
+               req->op = X_HASH;
+               req->offset = 0;
+               req->size = map->blocksize;
+               r = __set_node(mio, req, hashed_mn);
+               if (r < 0) {
+                       XSEGLOG2(&lc, E, "Cannot set node");
+                       put_request(pr, req);
+                       put_mapnode(mn);
+                       put_mapnode(hashed_mn);
+                       return -1;
+               }
+
+               r = send_request(pr, req);
+               if (r < 0) {
+                       XSEGLOG2(&lc, E, "Cannot send request %p, pr: %p, map: %s",
+                                       req, pr, map->volume);
+                       put_request(pr, req);
+                       __set_node(mio, req, NULL);
+                       put_mapnode(mn);
+                       put_mapnode(hashed_mn);
+                       return -1;
+               }
+               mio->pending_reqs++;
+               put_mapnode(mn);
+       }
+
+       return 0;
 }
 
-#endif
+int hash_map(struct peer_req *pr, struct map *map, struct map *hashed_map)
+{
+       int r;
+       struct mapper_io *mio = __get_mapper_io(pr);
+
+       XSEGLOG2(&lc, I, "Hashing map %s", map->volume);
+       map->state |= MF_MAP_HASHING;
+       mio->pending_reqs = 0;
+       mio->cb = hash_cb;
+       mio->err = 0;
 
+
+       r = __hash_map(pr, map, hashed_map);
+       if (r < 0) {
+               mio->err = 1;
+       }
+
+       if (mio->pending_reqs) {
+               wait_on_pr(pr, mio->pending_reqs >0);
+       }
+
+       mio->cb = NULL;
+       map->state &= ~MF_MAP_HASHING;
+       if (mio->err) {
+               XSEGLOG2(&lc, E, "Hashing map %s failed", map->volume);
+               return -1;
+       } else {
+               XSEGLOG2(&lc, I, "Hashing map %s completed", map->volume);
+               return 0;
+       }
+}
index e249c34..8865beb 100644 (file)
@@ -48,7 +48,7 @@ int read_object_v0(struct map_node *mn, unsigned char *buf)
        return 0;
 }
 
-void v0_object_to_map(unsigned char *data, struct map_node *mn)
+void object_to_map_v0(unsigned char *data, struct map_node *mn)
 {
        unhexlify(mn->object, data);
        //if name == zero block, raize MF_OBJECT_ZERO
@@ -74,7 +74,7 @@ struct xseg_request * prepare_write_object_v0(struct peer_req *pr, struct map *m
        req->offset = v0_mapheader_size + mn->objectidx * v0_objectsize_in_map;
 
        data = xseg_get_data(pr->peer->xseg, req);
-       v0_object_to_map((unsigned char *)data, mn);
+       object_to_map_v0((unsigned char *)data, mn);
        return req;
 }
 
@@ -149,7 +149,7 @@ struct xseg_request * __write_map_data_v0(struct peer_req *pr, struct map *map)
        pos = 0;
        for (i = 0; i < map->nr_objs; i++) {
                mn = &map->objects[i];
-               v0_object_to_map((unsigned char *)(data+pos), mn);
+               object_to_map_v0((unsigned char *)(data+pos), mn);
                pos += v0_objectsize_in_map;
        }
 
index c2ddc40..f1c17bc 100644 (file)
@@ -56,7 +56,7 @@
 #define v0_objectsize_in_map (SHA256_DIGEST_SIZE)
 
 int read_object_v0(struct map_node *mn, unsigned char *buf);
-//void v0_object_to_map(unsigned char* buf, struct map_node *mn);
+void object_to_map_v0(unsigned char* buf, struct map_node *mn);
 struct xseg_request * prepare_write_object_v0(struct peer_req *pr,
                                struct map *map, struct map_node *mn);
 //int read_map_v0(struct map *m, unsigned char * data);
@@ -68,6 +68,7 @@ int write_map_data_v0(struct peer_req *pr, struct map *map);
 
 /*.read_map = read_map_v0,     \*/
 #define map_functions_v0 {                             \
+                       .object_to_map = object_to_map_v0, \
                        .read_object = read_object_v0,  \
                        .prepare_write_object = prepare_write_object_v0,\
                        .load_map_data = load_map_data_v0, \
index 4c179c6..1e920e9 100644 (file)
@@ -59,7 +59,7 @@ int read_object_v1(struct map_node *mn, unsigned char *buf)
        return 0;
 }
 
-void v1_object_to_map(unsigned char* buf, struct map_node *mn)
+void object_to_map_v1(unsigned char* buf, struct map_node *mn)
 {
        buf[0] = (mn->flags & MF_OBJECT_WRITABLE)? 1 : 0;
        //assert !(mn->flags & MF_OBJECT_ARCHIP)
@@ -94,7 +94,7 @@ struct xseg_request * prepare_write_object_v1(struct peer_req *pr, struct map *m
        req->offset = v1_mapheader_size + mn->objectidx * v1_objectsize_in_map;
 
        data = xseg_get_data(pr->peer->xseg, req);
-       v1_object_to_map((unsigned char *)data, mn);
+       object_to_map_v1((unsigned char *)data, mn);
        return NULL;
 }
 
@@ -160,7 +160,7 @@ struct xseg_request * __write_map_data_v1(struct peer_req *pr, struct map *map)
        pos = 0;
        for (i = 0; i < map->nr_objs; i++) {
                mn = &map->objects[i];
-               v1_object_to_map((unsigned char *)(data+pos), mn);
+               object_to_map_v1((unsigned char *)(data+pos), mn);
                pos += v1_objectsize_in_map;
        }
 
index ac90dd4..edc02ac 100644 (file)
@@ -59,7 +59,7 @@
 #define v1_mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
 
 int read_object_v1(struct map_node *mn, unsigned char *buf);
-//void v1_object_to_map(unsigned char* buf, struct map_node *mn);
+void object_to_map_v1(unsigned char* buf, struct map_node *mn);
 struct xseg_request * prepare_write_object_v1(struct peer_req *pr,
                                struct map *map, struct map_node *mn);
 //int read_map_v1(struct map *m, unsigned char * data);
@@ -71,6 +71,7 @@ int write_map_data_v1(struct peer_req *pr, struct map *map);
 
 /*.read_map = read_map_v1,     \*/
 #define map_functions_v1 {                             \
+                       .object_to_map = object_to_map_v1, \
                        .read_object = read_object_v1,  \
                        .prepare_write_object = prepare_write_object_v1,\
                        .load_map_data = load_map_data_v1, \
index 1dc6a7a..4d36a4f 100644 (file)
 
 /* v2 functions */
 
-int initialize_map_objects(struct map *map)
-{
-       uint64_t i;
-       struct map_node *map_node = map->objects;
-
-       if (!map_node)
-               return -1;
-
-       for (i = 0; i < map->nr_objs; i++) {
-               map_node[i].map = map;
-               map_node[i].objectidx = i;
-               map_node[i].waiters = 0;
-               map_node[i].state = 0;
-               map_node[i].ref = 1;
-               map_node[i].cond = st_cond_new(); //FIXME err check;
-       }
-       return 0;
-}
-
-uint32_t get_map_block_name(char *target, struct map *map, uint64_t block_id)
+static uint32_t get_map_block_name(char *target, struct map *map, uint64_t block_id)
 {
        uint32_t targetlen;
        char buf[sizeof(block_id)*2 + 1];
@@ -78,7 +59,7 @@ struct obj2chunk {
        uint32_t len;
 };
 
-struct obj2chunk get_chunk(struct map *map, uint64_t start, uint64_t nr)
+static struct obj2chunk get_chunk(struct map *map, uint64_t start, uint64_t nr)
 {
        struct obj2chunk ret;
        uint64_t nr_objs_per_block, nr_objs_per_chunk, nr_chunks_per_block;
@@ -141,7 +122,7 @@ int read_object_v2(struct map_node *mn, unsigned char *buf)
        return 0;
 }
 
-void v2_object_to_map(unsigned char* buf, struct map_node *mn)
+void object_to_map_v2(unsigned char* buf, struct map_node *mn)
 {
        uint32_t *objectlen;
        uint32_t len;
@@ -200,7 +181,7 @@ struct xseg_request * prepare_write_objects_o2c_v2(struct peer_req *pr, struct m
        pos = 0;
        for (k = o2c.start_obj; k < limit; k++) {
                mn = &map->objects[k];
-               v2_object_to_map((unsigned char *)(data+pos), mn);
+               object_to_map_v2((unsigned char *)(data+pos), mn);
                pos += v2_objectsize_in_map;
        }
 
@@ -231,7 +212,7 @@ struct xseg_request * prepare_write_object_v2(struct peer_req *pr, struct map *m
                return NULL;
 
        data = xseg_get_data(peer->xseg, req);
-       v2_object_to_map((unsigned char *)(data), mn);
+       object_to_map_v2((unsigned char *)(data), mn);
        return req;
 }
 
@@ -273,6 +254,8 @@ int read_map_objects_v2(struct map *map, unsigned char *data, uint64_t start, ui
                }
        }
 
+       map_node = map->objects;
+
        for (i = start; i < nr; i++) {
                r = read_object_v2(&map_node[i], data+pos);
                if (r < 0) {
@@ -374,7 +357,7 @@ int __write_map_data_v2(struct peer_req *pr, struct map *map)
                        pos = 0;
                        for (k = start; k < map->nr_objs && k < limit; k++) {
                                mn = &map->objects[k];
-                               v2_object_to_map((unsigned char *)(data+pos), mn);
+                               object_to_map_v2((unsigned char *)(data+pos), mn);
                                pos += v2_objectsize_in_map;
                        }
 
index a7fbfee..c6b08ab 100644 (file)
@@ -71,7 +71,7 @@
                           sizeof(uint64_t))
 
 int read_object_v2(struct map_node *mn, unsigned char *buf);
-//void v2_object_to_map(unsigned char* buf, struct map_node *mn);
+void object_to_map_v2(unsigned char* buf, struct map_node *mn);
 struct xseg_request * prepare_write_object_v2(struct peer_req *pr,
                                struct map *map, struct map_node *mn);
 //int read_map_v2(struct map *m, unsigned char * data);
@@ -85,6 +85,7 @@ int write_map_data_v2(struct peer_req *pr, struct map *map);
 /*.read_map = read_map_v2,     \*/
 #define map_functions_v2 {                             \
                        .read_object = read_object_v2,  \
+                       .object_to_map = object_to_map_v2, \
                        .prepare_write_object = prepare_write_object_v2,\
                        .load_map_data = load_map_data_v2, \
                        .write_map_metadata = write_map_metadata_v2, \
index bc14605..8047a78 100644 (file)
@@ -96,6 +96,7 @@
  */
 
 struct map_functions {
+       void (*object_to_map)(unsigned char *buf, struct map_node *mn);
        int (*read_object)(struct map_node *mn, unsigned char *buf);
        struct xseg_request * (*prepare_write_object)(struct peer_req *pr,
                        struct map *map, struct map_node *mn);
index 6296c9a..6e6b882 100644 (file)
@@ -176,7 +176,7 @@ out:
        return r;
 }
 
-static inline struct map_node * get_mapnode(struct map *map, uint64_t index)
+inline struct map_node * get_mapnode(struct map *map, uint64_t index)
 {
        struct map_node *mn;
        if (index >= map->nr_objs) {
@@ -190,18 +190,41 @@ static inline struct map_node * get_mapnode(struct map *map, uint64_t index)
        }
        mn = &map->objects[index];
        mn->ref++;
+       XSEGLOG2(&lc, D,  "mapnode %p: ref: %u", mn, mn->ref);
        return mn;
 }
 
-static inline void put_mapnode(struct map_node *mn)
+inline void put_mapnode(struct map_node *mn)
 {
        mn->ref--;
+       XSEGLOG2(&lc, D, "mapnode %p: ref: %u", mn, mn->ref);
        if (!mn->ref){
                //clean up mn
                st_cond_destroy(mn->cond);
        }
 }
 
+int initialize_map_objects(struct map *map)
+{
+       uint64_t i;
+       struct map_node *map_node = map->objects;
+
+       if (!map_node)
+               return -1;
+
+       for (i = 0; i < map->nr_objs; i++) {
+               map_node[i].map = map;
+               map_node[i].objectidx = i;
+               map_node[i].waiters = 0;
+               map_node[i].state = 0;
+               map_node[i].ref = 1;
+               map_node[i].cond = st_cond_new(); //FIXME err check;
+       }
+       return 0;
+}
+
+
+
 static inline void __get_map(struct map *map)
 {
        map->ref++;
@@ -584,133 +607,123 @@ static int do_close(struct peer_req *pr, struct map *map)
        return dropcache(pr, map);
 }
 
-/* do_hash:
- *
- * send hash, overwrite object to the map, and do one write for the whole map.
- */
-#if 0
 static int do_hash(struct peer_req *pr, struct map *map)
 {
-       uint64_t i;
+       int r;
        struct peerd *peer = pr->peer;
-       struct mapper_io *mio = __get_mapper_io(pr);
+       uint64_t i, bufsize;
+       struct map *hashed_map;
+       unsigned char sha[SHA256_DIGEST_SIZE];
+       unsigned char *buf = NULL;
+       char newvolumename[MAX_VOLUME_LEN];
+       uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
+       uint64_t pos = 0;
+       char targetbuf[XSEG_MAX_TARGETLEN];
+       char *target;
+       struct xseg_reply_hash *xreply;
        struct map_node *mn;
-       struct xseg_request *req;
 
-       if (!(map->state & MF_MAP_EXCLUSIVE)){
-               XSEGLOG2(&lc, E, "Map was not opened exclusively");
+       if (!(map->flags & MF_MAP_READONLY)) {
+               XSEGLOG2(&lc, E, "Cannot hash live volumes");
                return -1;
        }
-       XSEGLOG2(&lc, I, "Starting snapshot for map %s", map->volume);
-       map->state |= MF_MAP_SNAPSHOTTING;
-
-       uint64_t nr_obj = calc_map_obj(map);
-       mio->cb = snapshot_cb;
-       mio->snap_pending = 0;
-       mio->err = 0;
-       for (i = 0; i < nr_obj; i++){
 
-               /* throttle pending snapshots
-                * this should be nr_ops of the blocker, but since we don't know
-                * that, we assume based on our own nr_ops
-                */
-               if (mio->snap_pending >= peer->nr_ops)
-                       wait_on_pr(pr, mio->snap_pending >= peer->nr_ops);
+       XSEGLOG2(&lc, I, "Hashing map %s", map->volume);
+       /* prepare hashed_map holder */
+       hashed_map = create_map("", 0, 0);
+       if (!hashed_map) {
+               XSEGLOG2(&lc, E, "Cannot create hashed map");
+               return -1;
+       }
 
-               mn = get_mapnode(map, i);
-               if (!mn)
-                       //warning?
-                       continue;
-               if (!(mn->flags & MF_OBJECT_WRITABLE)){
-                       put_mapnode(mn);
-                       continue;
-               }
-               // make sure all pending operations on all objects are completed
-               if (mn->flags & MF_OBJECT_NOT_READY)
-                       wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
+       /* set map metadata */
+       hashed_map->size = map->size;
+       hashed_map->nr_objs = map->nr_objs;
+       hashed_map->flags = MF_MAP_READONLY;
+       hashed_map->blocksize = MAPPER_DEFAULT_BLOCKSIZE; /* FIXME, this should be PITHOS_BLOCK_SIZE right? */
 
-               /* TODO will this ever happen?? */
-               if (mn->flags & MF_OBJECT_DESTROYED){
-                       put_mapnode(mn);
-                       continue;
-               }
-
-               req = __snapshot_object(pr, mn);
-               if (!req){
-                       mio->err = 1;
-                       put_mapnode(mn);
-                       break;
-               }
-               mio->snap_pending++;
-               /* do not put_mapnode here. cb does that */
+       hashed_map->objects = calloc(map->nr_objs, sizeof(struct map_node));
+       if (!hashed_map->objects) {
+               XSEGLOG2(&lc, E, "Cannot allocate memory for %llu nr_objs",
+                               hashed_map->nr_objs);
+               r = -1;
+               goto out;
        }
 
-       if (mio->snap_pending > 0)
-               wait_on_pr(pr, mio->snap_pending > 0);
-       mio->cb = NULL;
+       r = initialize_map_objects(hashed_map);
+       if (r < 0) {
+               XSEGLOG2(&lc, E, "Cannot initialize hashed_map objects");
+               goto out;
+       }
 
-       if (mio->err)
-               goto out_err;
+       r = hash_map(pr, map, hashed_map);
+       if (r < 0) {
+               XSEGLOG2(&lc, E, "Cannot hash map %s", map->volume);
+               goto out;
+       }
 
-       /* calculate name of snapshot */
-       struct map tmp_map = *map;
-       unsigned char sha[SHA256_DIGEST_SIZE];
-       unsigned char *buf = malloc(MAPPER_DEFAULT_BLOCKSIZE);
-       char newvolumename[MAX_VOLUME_LEN];
-       uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
-       uint64_t pos = 0;
-       uint64_t max_objidx = calc_map_obj(map);
-       int r;
+       bufsize = hashed_map->nr_objs * v0_objectsize_in_map;
 
-       for (i = 0; i < max_objidx; i++) {
-               mn = find_object(map, i);
+       buf = malloc(bufsize);
+       if (!buf) {
+               XSEGLOG2(&lc, E, "Cannot allocate merkle_hash buffer of %llu bytes",
+                               bufsize);
+               goto out;
+       }
+       for (i = 0; i < hashed_map->nr_objs; i++) {
+               mn = get_mapnode(hashed_map, i);
                if (!mn){
-                       XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
-                                       (unsigned long long) i, map->volume);
-                       goto out_err;
+                       XSEGLOG2(&lc, E, "Cannot get object %llu for map %s",
+                                       i, hashed_map->volume);
+                       goto out;
                }
-               v0_object_to_map(mn, buf+pos);
+               map_functions[0].object_to_map(buf+pos, mn);
                pos += v0_objectsize_in_map;
+               put_mapnode(mn);
        }
-//     SHA256(buf, pos, sha);
+
        merkle_hash(buf, pos, sha);
-       hexlify(sha, newvolumename);
-       strncpy(tmp_map.volume, newvolumename, newvolumenamelen);
-       tmp_map.volumelen = newvolumenamelen;
-       free(buf);
-       tmp_map.version = 0; // set volume version to pithos image
-
-       /* write the map of the Snapshot */
-       r = write_map(pr, &tmp_map);
-       if (r < 0)
-               goto out_err;
-       char targetbuf[XSEG_MAX_TARGETLEN];
-       char *target = xseg_get_target(peer->xseg, pr->req);
+       hexlify(sha, SHA256_DIGEST_SIZE, newvolumename);
+       strncpy(hashed_map->volume, newvolumename, newvolumenamelen);
+       hashed_map->volume[newvolumenamelen] = 0;
+       hashed_map->volumelen = newvolumenamelen;
+
+       /* write the hashed_map */
+       r = write_map(pr, hashed_map);
+       if (r < 0) {
+               XSEGLOG2(&lc, E, "Cannot write hashed_map %s", hashed_map->volume);
+               goto out;
+       }
+
+       /* Resize request to fit xhash reply */
+       target = xseg_get_target(peer->xseg, pr->req);
        strncpy(targetbuf, target, pr->req->targetlen);
+
        r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
-                       sizeof(struct xseg_reply_snapshot));
+                       sizeof(struct xseg_reply_hash));
        if (r < 0){
                XSEGLOG2(&lc, E, "Cannot resize request");
-               goto out_err;
+               goto out;
        }
+
        target = xseg_get_target(peer->xseg, pr->req);
        strncpy(target, targetbuf, pr->req->targetlen);
 
-       struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *)
-                                               xseg_get_data(peer->xseg, pr->req);
+       /* Put the target of the hashed_map on the reply */
+       xreply = (struct xseg_reply_hash *) xseg_get_data(peer->xseg, pr->req);
        strncpy(xreply->target, newvolumename, newvolumenamelen);
        xreply->targetlen = newvolumenamelen;
-       map->state &= ~MF_MAP_SNAPSHOTTING;
-       XSEGLOG2(&lc, I, "Snapshot for map %s completed", map->volume);
-       return 0;
 
-out_err:
-       map->state &= ~MF_MAP_SNAPSHOTTING;
-       XSEGLOG2(&lc, E, "Snapshot for map %s failed", map->volume);
-       return -1;
+out:
+       if (buf)
+               free(buf);
+       put_map(hashed_map);
+       if (r < 0) {
+               return -1;
+       } else {
+               return 0;
+       }
 }
-#endif
-
 
 static int do_snapshot(struct peer_req *pr, struct map *map)
 {
@@ -1390,6 +1403,23 @@ void * handle_snapshot(struct peer_req *pr)
        return NULL;
 }
 
+void * handle_hash(struct peer_req *pr)
+{
+       struct peerd *peer = pr->peer;
+       char *target = xseg_get_target(peer->xseg, pr->req);
+       /* Do not request exclusive access. Since we are hashing only shapshots
+        * which are read only, there is no need for locking
+        */
+       int r = map_action(do_hash, pr, target, pr->req->targetlen,
+                               MF_ARCHIP|MF_LOAD);
+       if (r < 0)
+               fail(peer, pr);
+       else
+               complete(peer, pr);
+       ta--;
+       return NULL;
+}
+
 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
                        struct xseg_request *req)
 {
@@ -1410,7 +1440,8 @@ int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
                case X_DELETE: action = handle_destroy; break;
                case X_OPEN: action = handle_open; break;
                case X_CLOSE: action = handle_close; break;
-               default: fprintf(stderr, "mydispatch: unknown up\n"); break;
+               case X_HASH: action = handle_hash; break;
+               default: fprintf(stderr, "mydispatch: unknown op\n"); break;
        }
        if (action){
                ta++;
index 0bcda67..ec6ba37 100644 (file)
@@ -134,9 +134,11 @@ struct map_node {
 //#define MF_MAP_DELETED               (1 << 8)
 #define MF_MAP_SNAPSHOTTING    (1 << 9)
 #define MF_MAP_SERIALIZING     (1 << 10)
+#define MF_MAP_HASHING         (1 << 11)
 #define MF_MAP_NOT_READY       (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
                                MF_MAP_DROPPING_CACHE|MF_MAP_OPENING|          \
-                               MF_MAP_SNAPSHOTTING|MF_MAP_SERIALIZING)
+                               MF_MAP_SNAPSHOTTING|MF_MAP_SERIALIZING|        \
+                               MF_MAP_HASHING)
 
 struct map {
        uint32_t version;
@@ -304,5 +306,8 @@ struct xseg_request * get_request(struct peer_req *pr, xport dst, char * target,
 void put_request(struct peer_req *pr, struct xseg_request *req);
 struct xseg_request * __load_map_metadata(struct peer_req *pr, struct map *map);
 int load_map_metadata(struct peer_req *pr, struct map *map);
-
+int initialize_map_objects(struct map *map);
+int hash_map(struct peer_req *pr, struct map *map, struct map *hashed_map);
+struct map_node * get_mapnode(struct map *map, uint64_t objindex);
+void put_mapnode(struct map_node *mn);
 #endif /* end MAPPER_H */