2 * Copyright 2012 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
37 #include <sys/types.h>
39 #include <xseg/xseg.h>
42 #include <xtypes/xhash.h>
43 #include <xseg/protocol.h>
44 //#include <sys/stat.h>
48 #include <sys/syscall.h>
51 #include <mapper-versions.h>
53 uint64_t cur_count = 0;
55 extern st_cond_t req_cond;
56 /* pithos considers this a block full of zeros, so should we.
57 * it is actually the sha256 hash of nothing.
59 char *zero_block="e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
61 void custom_peer_usage()
63 fprintf(stderr, "Custom peer options: \n"
64 "-bp : port for block blocker(!)\n"
65 "-mbp : port for map blocker\n"
74 static uint32_t calc_nr_obj(struct xseg_request *req)
77 uint64_t rem_size = req->size;
78 uint64_t obj_offset = req->offset & (MAPPER_DEFAULT_BLOCKSIZE -1); //modulo
79 uint64_t obj_size = (rem_size + obj_offset > MAPPER_DEFAULT_BLOCKSIZE) ? MAPPER_DEFAULT_BLOCKSIZE - obj_offset : rem_size;
81 while (rem_size > 0) {
82 obj_size = (rem_size > MAPPER_DEFAULT_BLOCKSIZE) ? MAPPER_DEFAULT_BLOCKSIZE : rem_size;
91 * Map cache handling functions
94 static struct map * find_map(struct mapperd *mapper, char *volume)
97 int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
104 static struct map * find_map_len(struct mapperd *mapper, char *target,
105 uint32_t targetlen, uint32_t flags)
107 char buf[XSEG_MAX_TARGETLEN+1];
109 if (targetlen > MAX_VOLUME_LEN){
110 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
111 targetlen, MAX_VOLUME_LEN);
115 if (flags & MF_ARCHIP){
116 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
117 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
118 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
119 targetlen += MAPPER_PREFIX_LEN;
122 strncpy(buf, target, targetlen);
126 XSEGLOG2(&lc, D, "looking up map %s, len %u",
128 return find_map(mapper, buf);
132 static int insert_map(struct mapperd *mapper, struct map *map)
136 if (find_map(mapper, map->volume)){
137 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
141 XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)",
142 map->volume, strlen(map->volume), (unsigned long) map);
143 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
144 while (r == -XHASH_ERESIZE) {
145 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
146 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, 0, NULL);
148 XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
149 (unsigned long long) shift);
152 mapper->hashmaps = new_hashmap;
153 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
159 static int remove_map(struct mapperd *mapper, struct map *map)
163 //assert no pending pr on map
165 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
166 while (r == -XHASH_ERESIZE) {
167 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
168 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, 0, NULL);
170 XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
171 (unsigned long long) shift);
174 mapper->hashmaps = new_hashmap;
175 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
181 inline struct map_node * get_mapnode(struct map *map, uint64_t index)
184 if (index >= map->nr_objs) {
185 // XSEGLOG2(&lc, E, "Index out of range: %llu > %llu",
186 // index, map->nr_objs);
190 // XSEGLOG2(&lc, E, "Map %s has no objects", map->volume);
193 mn = &map->objects[index];
195 XSEGLOG2(&lc, D, "mapnode %p: ref: %u", mn, mn->ref);
199 inline void put_mapnode(struct map_node *mn)
202 XSEGLOG2(&lc, D, "mapnode %p: ref: %u", mn, mn->ref);
205 st_cond_destroy(mn->cond);
209 int initialize_map_objects(struct map *map)
212 struct map_node *map_node = map->objects;
217 for (i = 0; i < map->nr_objs; i++) {
218 map_node[i].map = map;
219 map_node[i].objectidx = i;
220 map_node[i].waiters = 0;
221 map_node[i].state = 0;
223 map_node[i].cond = st_cond_new(); //FIXME err check;
230 static inline void __get_map(struct map *map)
235 static inline void put_map(struct map *map)
238 XSEGLOG2(&lc, D, "Putting map %lx %s. ref %u", map, map->volume, map->ref);
241 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
243 * Check that every object is not used by another state thread.
244 * This should always check out, otherwise there is a bug. Since
245 * before a thread can manipulate an object, it must first get
246 * the map, the map ref will never hit zero, while another
247 * thread is using an object.
250 for (i = 0; i < map->nr_objs; i++) {
251 mn = get_mapnode(map, i);
253 //make sure all pending operations on all objects are completed
254 if (mn->state & MF_OBJECT_NOT_READY) {
255 XSEGLOG2(&lc, E, "BUG: map node in use while freeing map");
256 wait_on_mapnode(mn, mn->state & MF_OBJECT_NOT_READY);
258 // mn->state |= MF_OBJECT_DESTROYED;
259 put_mapnode(mn); //matchin mn->ref = 1 on mn init
260 put_mapnode(mn); //matcing get_mapnode;
261 //assert mn->ref == 0;
263 XSEGLOG2(&lc, E, "BUG: map node ref != 0 after final put");
270 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
275 static struct map * create_map(char *name, uint32_t namelen, uint32_t flags)
277 if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
278 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
279 namelen, MAX_VOLUME_LEN);
282 struct map *m = malloc(sizeof(struct map));
284 XSEGLOG2(&lc, E, "Cannot allocate map ");
288 if (flags & MF_ARCHIP){
289 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
290 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
291 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
292 m->volumelen = MAPPER_PREFIX_LEN + namelen;
293 /* Use the latest map version here, when creating a new map. If
294 * the map is read from storage, this version will be rewritten
295 * with the right value.
297 m->version = MAP_LATEST_VERSION;
301 strncpy(m->volume, name, namelen);
302 m->volume[namelen] = 0;
303 m->volumelen = namelen;
304 m->version = 0; /* version 0 should be pithos maps */
305 m->flags = MF_MAP_READONLY;
313 m->cond = st_cond_new(); //FIXME err check;
316 m->waiters_users = 0;
317 m->users_cond = st_cond_new();
322 static void wait_all_map_objects_ready(struct map *map)
327 //TODO: maybe add counter on the map on how many objects are used, to
328 //speed up the common case, where there are no used objects.
329 map->state |= MF_MAP_SERIALIZING;
331 wait_all_objects_ready(map);
333 for (i = 0; i < map->nr_objs; i++) {
334 mn = get_mapnode(map, i);
336 //make sure all pending operations on all objects are completed
337 if (mn->state & MF_OBJECT_NOT_READY) {
338 XSEGLOG2(&lc, E, "BUG: Map node %x of map %s, "
339 "idx: %llu is not ready",
341 // wait_on_mapnode(mn, mn->state & MF_OBJECT_NOT_READY);
347 map->state &= ~MF_MAP_SERIALIZING;
357 static int do_copyups(struct peer_req *pr, struct r2o *mns, int n)
359 struct mapper_io *mio = __get_mapper_io(pr);
361 int i, j, can_wait = 0;
362 mio->pending_reqs = 0;
366 /* do a first scan and issue as many copyups as we can.
367 * then retry and wait when an object is not ready.
368 * this could be done better, since now we wait also on the
371 for (j = 0; j < 2 && !mio->err; j++) {
372 for (i = 0; i < n && !mio->err; i++) {
375 if (mn->state & MF_OBJECT_NOT_READY){
378 /* here mn->flags should be
379 * MF_OBJECT_COPYING or MF_OBJECT_WRITING or
380 * later MF_OBJECT_HASHING.
381 * Otherwise it's a bug.
383 if (mn->state != MF_OBJECT_COPYING
384 && mn->state != MF_OBJECT_WRITING) {
385 XSEGLOG2(&lc, E, "BUG: Map node has wrong state");
387 wait_on_mapnode(mn, mn->state & MF_OBJECT_NOT_READY);
388 // if (mn->state & MF_OBJECT_DESTROYED){
394 if (!(mn->flags & MF_OBJECT_WRITABLE)) {
395 //calc new_target, copy up object
396 if (__copyup_object(pr, mn) == NULL){
397 XSEGLOG2(&lc, E, "Error in copy up object");
409 XSEGLOG2(&lc, E, "Mio->err, pending_copyups: %d", mio->pending_reqs);
412 if (mio->pending_reqs > 0)
413 wait_on_pr(pr, mio->pending_reqs > 0);
415 return mio->err ? -1 : 0;
418 static int req2objs(struct peer_req *pr, struct map *map, int write)
421 struct peerd *peer = pr->peer;
422 struct mapper_io *mio = __get_mapper_io(pr);
423 char *target = xseg_get_target(peer->xseg, pr->req);
424 uint32_t nr_objs = calc_nr_obj(pr->req);
425 uint64_t size = sizeof(struct xseg_reply_map) +
426 nr_objs * sizeof(struct xseg_reply_map_scatterlist);
428 uint64_t rem_size, obj_index, obj_offset, obj_size;
430 char buf[XSEG_MAX_TARGETLEN];
431 struct xseg_reply_map *reply;
433 XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
435 if (pr->req->offset + pr->req->size > map->size) {
436 XSEGLOG2(&lc, E, "Invalid offset/size: offset: %llu, "
437 "size: %llu, map size: %llu",
438 pr->req->offset, pr->req->size, map->size);
442 /* get map_nodes of request */
443 struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
445 XSEGLOG2(&lc, E, "Cannot allocate mns");
452 rem_size = pr->req->size;
453 obj_index = pr->req->offset / MAPPER_DEFAULT_BLOCKSIZE;
454 obj_offset = pr->req->offset & (MAPPER_DEFAULT_BLOCKSIZE -1); //modulo
455 obj_size = (obj_offset + rem_size > MAPPER_DEFAULT_BLOCKSIZE) ? MAPPER_DEFAULT_BLOCKSIZE - obj_offset : rem_size;
456 mn = get_mapnode(map, obj_index);
458 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n",
459 (unsigned long long) obj_index);
464 mns[idx].offset = obj_offset;
465 mns[idx].size = obj_size;
466 rem_size -= obj_size;
467 while (rem_size > 0) {
471 obj_size = (rem_size > MAPPER_DEFAULT_BLOCKSIZE) ? MAPPER_DEFAULT_BLOCKSIZE : rem_size;
472 rem_size -= obj_size;
473 mn = get_mapnode(map, obj_index);
475 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
480 mns[idx].offset = obj_offset;
481 mns[idx].size = obj_size;
484 if (do_copyups(pr, mns, idx+1) < 0) {
486 XSEGLOG2(&lc, E, "do_copyups failed");
491 /* resize request to fit reply */
492 strncpy(buf, target, pr->req->targetlen);
493 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
495 XSEGLOG2(&lc, E, "Cannot resize request");
498 target = xseg_get_target(peer->xseg, pr->req);
499 strncpy(target, buf, pr->req->targetlen);
501 /* structure reply */
502 reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
503 reply->cnt = nr_objs;
504 for (i = 0; i < (idx+1); i++) {
505 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
506 reply->segs[i].targetlen = mns[i].mn->objectlen;
507 reply->segs[i].offset = mns[i].offset;
508 reply->segs[i].size = mns[i].size;
511 for (i = 0; i < (idx+1); i++) {
512 put_mapnode(mns[i].mn);
517 signal_all_objects_ready(map);
522 static int do_info(struct peer_req *pr, struct map *map)
524 struct peerd *peer = pr->peer;
525 struct xseg_reply_info *xinfo;
526 struct xseg_request *req = pr->req;
527 char buf[XSEG_MAX_TARGETLEN + 1];
531 if (req->datalen < sizeof(struct xseg_reply_info)) {
532 target = xseg_get_target(peer->xseg, req);
533 strncpy(buf, target, req->targetlen);
534 r = xseg_resize_request(peer->xseg, req, req->targetlen, sizeof(struct xseg_reply_info));
536 XSEGLOG2(&lc, E, "Cannot resize request");
539 target = xseg_get_target(peer->xseg, req);
540 strncpy(target, buf, req->targetlen);
543 xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, req);
544 xinfo->size = map->size;
549 static int do_open(struct peer_req *pr, struct map *map)
551 if (map->state & MF_MAP_EXCLUSIVE) {
560 static int dropcache(struct peer_req *pr, struct map *map)
563 struct peerd *peer = pr->peer;
564 struct mapperd *mapper = __get_mapperd(peer);
565 XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
567 * We can lazily drop the cache from here, by just removing from the maps
568 * hashmap making it inaccessible from future requests. This is because:
570 * a) Dropping cache for a map is serialized on a map level. So there
571 * should not be any other threds modifying the struct map.
573 * b) Any other thread manipulating the map nodes should not have
574 * any pending requests on the map node, if the map is not opened
575 * exclusively. If that's the case, then we should not close the map,
576 * a.k.a. releasing the map lock without checking for any pending
577 * requests. Furthermore, since each operation on a map gets a map
578 * reference, the memory will not be freed, unless every request has
579 * finished processing the map.
582 /* Set map as destroyed to notify any waiters that hold a reference to
586 r = remove_map(mapper, map);
588 XSEGLOG2(&lc, E, "Remove map %s from hashmap failed", map->volume);
589 XSEGLOG2(&lc, E, "Dropping cache for map %s failed", map->volume);
592 map->state |= MF_MAP_DESTROYED;
593 XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
594 put_map(map); // put map here to destroy it (matches m->ref = 1 on map create)
598 static int do_close(struct peer_req *pr, struct map *map)
600 if (!(map->state & MF_MAP_EXCLUSIVE)) {
601 XSEGLOG2(&lc, E, "Attempted to close a not opened map");
605 /* Do not close the map while there are pending requests on the
608 wait_all_map_objects_ready(map);
609 if (close_map(pr, map) < 0) {
616 static int do_hash(struct peer_req *pr, struct map *map)
619 struct peerd *peer = pr->peer;
621 struct map *hashed_map;
622 unsigned char sha[SHA256_DIGEST_SIZE];
623 unsigned char *buf = NULL;
624 char newvolumename[MAX_VOLUME_LEN];
625 uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
627 char targetbuf[XSEG_MAX_TARGETLEN];
629 struct xseg_reply_hash *xreply;
632 if (!(map->flags & MF_MAP_READONLY)) {
633 XSEGLOG2(&lc, E, "Cannot hash live volumes");
637 XSEGLOG2(&lc, I, "Hashing map %s", map->volume);
638 /* prepare hashed_map holder */
639 hashed_map = create_map("", 0, 0);
641 XSEGLOG2(&lc, E, "Cannot create hashed map");
645 /* set map metadata */
646 hashed_map->size = map->size;
647 hashed_map->nr_objs = map->nr_objs;
648 hashed_map->flags = MF_MAP_READONLY;
649 hashed_map->blocksize = MAPPER_DEFAULT_BLOCKSIZE; /* FIXME, this should be PITHOS_BLOCK_SIZE right? */
651 hashed_map->objects = calloc(map->nr_objs, sizeof(struct map_node));
652 if (!hashed_map->objects) {
653 XSEGLOG2(&lc, E, "Cannot allocate memory for %llu nr_objs",
654 hashed_map->nr_objs);
659 r = initialize_map_objects(hashed_map);
661 XSEGLOG2(&lc, E, "Cannot initialize hashed_map objects");
665 r = hash_map(pr, map, hashed_map);
667 XSEGLOG2(&lc, E, "Cannot hash map %s", map->volume);
671 bufsize = hashed_map->nr_objs * v0_objectsize_in_map;
673 buf = malloc(bufsize);
675 XSEGLOG2(&lc, E, "Cannot allocate merkle_hash buffer of %llu bytes",
679 for (i = 0; i < hashed_map->nr_objs; i++) {
680 mn = get_mapnode(hashed_map, i);
682 XSEGLOG2(&lc, E, "Cannot get object %llu for map %s",
683 i, hashed_map->volume);
686 map_functions[0].object_to_map(buf+pos, mn);
687 pos += v0_objectsize_in_map;
691 merkle_hash(buf, pos, sha);
692 hexlify(sha, SHA256_DIGEST_SIZE, newvolumename);
693 strncpy(hashed_map->volume, newvolumename, newvolumenamelen);
694 hashed_map->volume[newvolumenamelen] = 0;
695 hashed_map->volumelen = newvolumenamelen;
697 /* write the hashed_map */
698 r = write_map(pr, hashed_map);
700 XSEGLOG2(&lc, E, "Cannot write hashed_map %s", hashed_map->volume);
704 /* Resize request to fit xhash reply */
705 target = xseg_get_target(peer->xseg, pr->req);
706 strncpy(targetbuf, target, pr->req->targetlen);
708 r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
709 sizeof(struct xseg_reply_hash));
711 XSEGLOG2(&lc, E, "Cannot resize request");
715 target = xseg_get_target(peer->xseg, pr->req);
716 strncpy(target, targetbuf, pr->req->targetlen);
718 /* Put the target of the hashed_map on the reply */
719 xreply = (struct xseg_reply_hash *) xseg_get_data(peer->xseg, pr->req);
720 strncpy(xreply->target, newvolumename, newvolumenamelen);
721 xreply->targetlen = newvolumenamelen;
734 static int do_snapshot(struct peer_req *pr, struct map *map)
737 struct peerd *peer = pr->peer;
738 //struct mapper_io *mio = __get_mapper_io(pr);
741 struct map *snap_map;
742 struct xseg_request_snapshot *xsnapshot;
744 uint32_t snapnamelen;
747 xsnapshot = (struct xseg_request_snapshot *)xseg_get_data(peer->xseg, pr->req);
751 snapname = xsnapshot->target;
752 snapnamelen = xsnapshot->targetlen;
755 XSEGLOG2(&lc, E, "Snapshot name must be provided");
759 if (!(map->state & MF_MAP_EXCLUSIVE)) {
760 XSEGLOG2(&lc, E, "Map was not opened exclusively");
763 if (map->epoch == UINT64_MAX) {
764 XSEGLOG2(&lc, E, "Max epoch reached for %s", map->volume);
767 XSEGLOG2(&lc, I, "Starting snapshot for map %s", map->volume);
768 map->state |= MF_MAP_SNAPSHOTTING;
770 //create new map struct with name snapshot name and flag readonly.
771 snap_map = create_map(snapname, snapnamelen, MF_ARCHIP);
776 //open/load map to check if snap exists
777 r = open_map(pr, snap_map, 0);
779 XSEGLOG2(&lc, E, "Could not open snap map");
780 XSEGLOG2(&lc, E, "Snapshot exists");
783 r = load_map_metadata(pr, snap_map);
784 if (r >= 0 && !(map->flags & MF_MAP_DELETED)) {
785 XSEGLOG2(&lc, E, "Snapshot exists");
789 //snap_map->flags &= ~MF_MAP_DELETED;
790 snap_map->flags = MF_MAP_READONLY;
791 snap_map->objects = map->objects;
792 snap_map->size = map->size;
793 snap_map->blocksize = map->blocksize;
794 snap_map->nr_objs = map->nr_objs;
797 nr_objs = map->nr_objs;
799 //set all map_nodes read only;
800 //TODO, maybe skip that check and add an epoch number on each object.
801 //Then we can check if object is writable iff object epoch == map epoch
802 wait_all_map_objects_ready(map);
803 for (i = 0; i < nr_objs; i++) {
804 mn = get_mapnode(map, i);
806 XSEGLOG2(&lc, E, "Could not get map node %llu for map %s",
811 // make sure all pending operations on all objects are completed
812 // Basically make sure, that no previously copy up operation,
813 // will mess with our state.
814 // This works, since only a map_w, that was processed before
815 // this request, can have issued an object write request which
816 // may be pending. Since the objects are processed in the same
817 // order by the copyup operation and the snapshot operation, we
818 // can be sure, that no previously ready objects, have changed
819 // their state into not read.
820 // No other operation that manipulated map objects can occur
821 // simutaneously with snapshot operation.
822 if (mn->state & MF_OBJECT_NOT_READY)
823 XSEGLOG2(&lc, E, "BUG: object not ready");
824 // wait_on_mapnode(mn, mn->state & MF_OBJECT_NOT_READY);
826 mn->flags &= ~MF_OBJECT_WRITABLE;
832 r = write_map(pr, map);
834 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
835 /* Not restoring epoch or writable status here, is not
836 * devastating, since this is not the common case, and it can
837 * only cause unneeded copy-on-write operations.
842 r = write_map(pr, snap_map);
844 XSEGLOG2(&lc, E, "Write of snapshot map failed");
848 close_map(pr, snap_map);
849 snap_map->objects = NULL;
852 map->state &= ~MF_MAP_SNAPSHOTTING;
854 if (map->opened_count == cur_count)
857 XSEGLOG2(&lc, I, "Snapshot for map %s completed", map->volume);
861 snap_map->objects = NULL;
863 close_map(pr, snap_map);
867 map->state &= ~MF_MAP_SNAPSHOTTING;
868 XSEGLOG2(&lc, E, "Snapshot for map %s failed", map->volume);
872 /* This should probably me a map function */
873 static int do_destroy(struct peer_req *pr, struct map *map)
875 //uint64_t i, nr_obj;
876 //struct peerd *peer = pr->peer;
877 //struct mapper_io *mio = __get_mapper_io(pr);
878 //struct map_node *mn;
879 //struct xseg_request *req;
882 if (!(map->state & MF_MAP_EXCLUSIVE))
885 if (map->flags & MF_MAP_DELETED) {
886 XSEGLOG2(&lc, E, "Map %s already deleted", map->volume);
891 XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
892 map->state |= MF_MAP_DELETING;
893 map->flags |= MF_MAP_DELETED;
894 /* Just write map here. Only thing that matters are the map flags, which
895 * will not be overwritten by any other concurrent map write which can
896 * be caused by a copy up. Also if by any chance, the volume is
897 * recreated and there are pending copy ups from the old node, they will
898 * not mess with the new one. So let's just be fast.
900 /* we could write only metadata here to speed things up*/
901 /* Also, we could delete/truncate the unnecessary map blocks, aka all but
902 * metadata, but that would require to make sure there are no pending
903 * operations on any block, aka wait_all_map_objects_ready(). Or we can do
904 * it later, with garbage collection.
906 r = write_map_metadata(pr, map);
908 map->state &= ~MF_MAP_DELETING;
909 XSEGLOG2(&lc, E, "Failed to destroy map %s", map->volume);
913 map->state &= ~MF_MAP_DELETING;
914 XSEGLOG2(&lc, I, "Deleted map %s", map->volume);
915 /* do close will drop the map from cache */
918 /* if do_close fails, an error message will be logged, but the deletion
919 * was successfull, and there isn't much to do about the error.
924 static int do_mapr(struct peer_req *pr, struct map *map)
926 struct peerd *peer = pr->peer;
927 int r = req2objs(pr, map, 0);
929 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
931 (unsigned long long) pr->req->offset,
932 (unsigned long long) (pr->req->offset + pr->req->size));
935 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
937 (unsigned long long) pr->req->offset,
938 (unsigned long long) (pr->req->offset + pr->req->size));
939 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
940 (unsigned long long) pr->req->offset,
941 (unsigned long long) pr->req->size);
942 char buf[XSEG_MAX_TARGETLEN+1];
943 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
945 for (i = 0; i < reply->cnt; i++) {
946 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
947 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
948 buf[reply->segs[i].targetlen] = 0;
949 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
950 (unsigned long long) reply->segs[i].offset,
951 (unsigned long long) reply->segs[i].size);
956 static int do_mapw(struct peer_req *pr, struct map *map)
958 struct peerd *peer = pr->peer;
960 if (map->flags & MF_MAP_READONLY) {
961 XSEGLOG2(&lc, E, "Cannot write to a read only map");
964 r = req2objs(pr, map, 1);
966 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
968 (unsigned long long) pr->req->offset,
969 (unsigned long long) (pr->req->offset + pr->req->size));
972 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
974 (unsigned long long) pr->req->offset,
975 (unsigned long long) (pr->req->offset + pr->req->size));
976 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
977 (unsigned long long) pr->req->offset,
978 (unsigned long long) pr->req->size);
979 char buf[XSEG_MAX_TARGETLEN+1];
980 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
982 for (i = 0; i < reply->cnt; i++) {
983 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
984 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
985 buf[reply->segs[i].targetlen] = 0;
986 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
987 (unsigned long long) reply->segs[i].offset,
988 (unsigned long long) reply->segs[i].size);
993 //here map is the parent map
994 static int do_clone(struct peer_req *pr, struct map *map)
998 struct peerd *peer = pr->peer;
999 //struct mapperd *mapper = __get_mapperd(peer);
1000 char *target = xseg_get_target(peer->xseg, pr->req);
1001 struct map *clonemap;
1002 struct map_node *map_nodes, *mn;
1003 struct xseg_request_clone *xclone =
1004 (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1006 if (!(map->flags & MF_MAP_READONLY)) {
1007 XSEGLOG2(&lc, E, "Cloning is supported only from a snapshot");
1011 XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1012 clonemap = create_map(target, pr->req->targetlen, MF_ARCHIP);
1014 XSEGLOG2(&lc, E, "Create map %s failed");
1018 /* open map to get exclusive access to map */
1019 r = open_map(pr, clonemap, 0);
1021 XSEGLOG2(&lc, E, "Cannot open map %s", clonemap->volume);
1022 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
1025 r = load_map_metadata(pr, clonemap);
1026 if (r >= 0 && !(clonemap->flags & MF_MAP_DELETED)) {
1027 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
1031 /* Make sure, we can take at least one snapshot of the new volume */
1032 if (map->epoch >= UINT64_MAX - 2) {
1033 XSEGLOG2(&lc, E, "Max epoch reached for %s", clonemap->volume);
1036 clonemap->flags &= ~MF_MAP_DELETED;
1039 if (!(xclone->size))
1040 clonemap->size = map->size;
1042 clonemap->size = xclone->size;
1043 if (clonemap->size < map->size){
1044 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1045 "\n\t for requested clone %s",
1046 (unsigned long long) clonemap->size,
1047 (unsigned long long) map->size, clonemap->volume);
1051 clonemap->blocksize = MAPPER_DEFAULT_BLOCKSIZE;
1052 //alloc and init map_nodes
1053 c = calc_map_obj(clonemap);
1054 map_nodes = calloc(c, sizeof(struct map_node));
1058 clonemap->objects = map_nodes;
1059 clonemap->nr_objs = c;
1060 for (i = 0; i < c; i++) {
1061 mn = get_mapnode(map, i);
1063 strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1064 map_nodes[i].objectlen = mn->objectlen;
1065 map_nodes[i].flags = 0;
1066 if (mn->flags & MF_OBJECT_ARCHIP)
1067 map_nodes[i].flags |= MF_OBJECT_ARCHIP;
1068 if (mn->flags & MF_OBJECT_ZERO)
1069 map_nodes[i].flags |= MF_OBJECT_ZERO;
1072 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1073 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1074 map_nodes[i].flags = MF_OBJECT_ZERO;
1076 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1077 map_nodes[i].state = 0;
1078 map_nodes[i].objectidx = i;
1079 map_nodes[i].map = clonemap;
1080 map_nodes[i].ref = 1;
1081 map_nodes[i].waiters = 0;
1082 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1085 r = write_map(pr, clonemap);
1087 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1091 XSEGLOG2(&lc, I, "Cloning map %s to %s completed",
1092 map->volume, clonemap->volume);
1093 close_map(pr, clonemap);
1098 close_map(pr, clonemap);
1104 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1107 if (flags & MF_EXCLUSIVE){
1108 r = open_map(pr, map, flags);
1110 if (flags & MF_FORCE){
1117 r = load_map(pr, map);
1118 if (r < 0 && opened){
1124 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
1128 struct peerd *peer = pr->peer;
1129 struct mapperd *mapper = __get_mapperd(peer);
1130 struct map *map = find_map_len(mapper, name, namelen, flags);
1132 if (flags & MF_LOAD){
1133 map = create_map(name, namelen, flags);
1136 r = insert_map(mapper, map);
1138 XSEGLOG2(&lc, E, "Cannot insert map %s", map->volume);
1142 r = open_load_map(pr, map, flags);
1145 /* signal map here, so any other threads that
1146 * tried to get the map, but couldn't because
1147 * of the opening or loading operation that
1148 * failed, can continue.
1154 /* If the map is deleted, drop everything and return
1157 if (map->flags & MF_MAP_DELETED){
1158 XSEGLOG2(&lc, E, "Loaded deleted map %s. Failing...",
1177 static int map_action(int (action)(struct peer_req *pr, struct map *map),
1178 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1180 //struct peerd *peer = pr->peer;
1183 map = get_map(pr, name, namelen, flags);
1186 if (map->state & MF_MAP_NOT_READY){
1187 wait_on_map(map, (map->state & MF_MAP_NOT_READY));
1191 int r = action(pr, map);
1192 //always drop cache if map not read exclusively
1193 if (!(map->state & MF_MAP_EXCLUSIVE))
1200 void * handle_info(struct peer_req *pr)
1202 struct peerd *peer = pr->peer;
1203 char *target = xseg_get_target(peer->xseg, pr->req);
1204 int r = map_action(do_info, pr, target, pr->req->targetlen,
1214 void * handle_clone(struct peer_req *pr)
1217 struct peerd *peer = pr->peer;
1218 //struct mapperd *mapper = __get_mapperd(peer);
1219 struct xseg_request_clone *xclone;
1220 xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1226 if (xclone->targetlen){
1227 /* if snap was defined */
1228 if (pr->req->flags & XF_CONTADDR)
1229 r = map_action(do_clone, pr, xclone->target,
1230 xclone->targetlen, MF_LOAD);
1232 r = map_action(do_clone, pr, xclone->target,
1233 xclone->targetlen, MF_LOAD|MF_ARCHIP);
1235 /* else try to create a new volume */
1236 XSEGLOG2(&lc, I, "Creating volume");
1238 XSEGLOG2(&lc, E, "Cannot create volume. Size not specified");
1243 char *target = xseg_get_target(peer->xseg, pr->req);
1245 //create a new empty map of size
1246 map = create_map(target, pr->req->targetlen, MF_ARCHIP);
1251 /* open map to get exclusive access to map */
1252 r = open_map(pr, map, 0);
1254 XSEGLOG2(&lc, E, "Cannot open map %s", map->volume);
1255 XSEGLOG2(&lc, E, "Target volume %s exists", map->volume);
1260 r = load_map_metadata(pr, map);
1261 if (r >= 0 && !(map->flags & MF_MAP_DELETED)) {
1262 XSEGLOG2(&lc, E, "Map exists %s", map->volume);
1268 if (map->epoch >= UINT64_MAX - 2) {
1269 XSEGLOG2(&lc, E, "Max epoch reached for %s", map->volume);
1276 map->flags &= ~MF_MAP_DELETED;
1277 map->size = xclone->size;
1278 map->blocksize = MAPPER_DEFAULT_BLOCKSIZE;
1279 map->nr_objs = calc_map_obj(map);
1280 uint64_t nr_objs = map->nr_objs;
1281 //populate_map with zero objects;
1283 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1285 XSEGLOG2(&lc, E, "Cannot allocate %llu nr_objs", nr_objs);
1291 map->objects = map_nodes;
1294 for (i = 0; i < nr_objs; i++) {
1295 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1296 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1297 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1298 map_nodes[i].flags = MF_OBJECT_ZERO ; //MF_OBJECT_ARCHIP;
1299 map_nodes[i].state = 0;
1300 map_nodes[i].objectidx = i;
1301 map_nodes[i].map = map;
1302 map_nodes[i].ref = 1;
1303 map_nodes[i].waiters = 0;
1304 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1306 r = write_map(pr, map);
1308 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1313 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1327 void * handle_mapr(struct peer_req *pr)
1329 struct peerd *peer = pr->peer;
1330 char *target = xseg_get_target(peer->xseg, pr->req);
1331 int r = map_action(do_mapr, pr, target, pr->req->targetlen,
1332 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
1341 void * handle_mapw(struct peer_req *pr)
1343 struct peerd *peer = pr->peer;
1344 char *target = xseg_get_target(peer->xseg, pr->req);
1345 int r = map_action(do_mapw, pr, target, pr->req->targetlen,
1346 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1351 XSEGLOG2(&lc, D, "Ta: %d", ta);
1356 void * handle_destroy(struct peer_req *pr)
1358 struct peerd *peer = pr->peer;
1359 char *target = xseg_get_target(peer->xseg, pr->req);
1360 /* request EXCLUSIVE access, but do not force it.
1361 * check if succeeded on do_destroy
1363 int r = map_action(do_destroy, pr, target, pr->req->targetlen,
1364 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
1373 void * handle_open(struct peer_req *pr)
1375 struct peerd *peer = pr->peer;
1376 char *target = xseg_get_target(peer->xseg, pr->req);
1377 //here we do not want to load
1378 int r = map_action(do_open, pr, target, pr->req->targetlen,
1379 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
1388 void * handle_close(struct peer_req *pr)
1390 struct peerd *peer = pr->peer;
1391 char *target = xseg_get_target(peer->xseg, pr->req);
1392 //here we do not want to load
1393 int r = map_action(do_close, pr, target, pr->req->targetlen,
1394 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
1403 void * handle_snapshot(struct peer_req *pr)
1405 struct peerd *peer = pr->peer;
1406 char *target = xseg_get_target(peer->xseg, pr->req);
1407 /* request EXCLUSIVE access, but do not force it.
1408 * check if succeeded on do_snapshot
1410 int r = map_action(do_snapshot, pr, target, pr->req->targetlen,
1411 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
1420 void * handle_hash(struct peer_req *pr)
1422 struct peerd *peer = pr->peer;
1423 char *target = xseg_get_target(peer->xseg, pr->req);
1424 /* Do not request exclusive access. Since we are hashing only shapshots
1425 * which are read only, there is no need for locking
1427 int r = map_action(do_hash, pr, target, pr->req->targetlen,
1437 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
1438 struct xseg_request *req)
1440 //struct mapperd *mapper = __get_mapperd(peer);
1441 struct mapper_io *mio = __get_mapper_io(pr);
1442 void *(*action)(struct peer_req *) = NULL;
1444 //mio->state = ACCEPTED;
1448 mio->count = cur_count;
1449 switch (pr->req->op) {
1450 /* primary xseg operations of mapper */
1451 case X_CLONE: action = handle_clone; break;
1452 case X_MAPR: action = handle_mapr; break;
1453 case X_MAPW: action = handle_mapw; break;
1454 case X_SNAPSHOT: action = handle_snapshot; break;
1455 case X_INFO: action = handle_info; break;
1456 case X_DELETE: action = handle_destroy; break;
1457 case X_OPEN: action = handle_open; break;
1458 case X_CLOSE: action = handle_close; break;
1459 case X_HASH: action = handle_hash; break;
1460 default: fprintf(stderr, "mydispatch: unknown op\n"); break;
1465 st_thread_create(action, pr, 0, 0);
1472 struct peer_req *pr;
1473 struct xseg_request *req;
1476 void * callback_caller(struct cb_arg *arg)
1478 struct peer_req *pr = arg->pr;
1479 struct xseg_request *req = arg->req;
1480 struct mapper_io *mio = __get_mapper_io(pr);
1488 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1489 enum dispatch_reason reason)
1491 struct mapper_io *mio = __get_mapper_io(pr);
1494 if (reason == dispatch_accept)
1495 dispatch_accepted(peer, pr, req);
1498 // mio->cb(pr, req);
1499 arg = malloc(sizeof(struct cb_arg));
1501 XSEGLOG2(&lc, E, "Cannot allocate cb_arg");
1508 st_thread_create(callback_caller, arg, 0, 0);
1516 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1520 //FIXME error checks
1521 struct mapperd *mapper = malloc(sizeof(struct mapperd));
1522 peer->priv = mapper;
1524 mapper->hashmaps = xhash_new(3, 0, STRING);
1526 for (i = 0; i < peer->nr_ops; i++) {
1527 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1528 mio->copyups_nodes = xhash_new(3, 0, INTEGER);
1529 mio->pending_reqs = 0;
1532 peer->peer_reqs[i].priv = mio;
1535 mapper->bportno = -1;
1536 mapper->mbportno = -1;
1537 BEGIN_READ_ARGS(argc, argv);
1538 READ_ARG_ULONG("-bp", mapper->bportno);
1539 READ_ARG_ULONG("-mbp", mapper->mbportno);
1541 if (mapper->bportno == -1){
1542 XSEGLOG2(&lc, E, "Portno for blocker must be provided");
1546 if (mapper->mbportno == -1){
1547 XSEGLOG2(&lc, E, "Portno for mblocker must be provided");
1552 const struct sched_param param = { .sched_priority = 99 };
1553 sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, ¶m);
1554 /* FIXME maybe place it in peer
1555 * should be done for each port (sportno to eportno)
1557 xseg_set_max_requests(peer->xseg, peer->portno_start, 5000);
1558 xseg_set_freequeue_size(peer->xseg, peer->portno_start, 3000, 0);
1560 req_cond = st_cond_new();
1567 /* FIXME this should not be here */
1568 int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
1570 struct xseg *xseg = peer->xseg;
1571 xport portno_start = peer->portno_start;
1572 xport portno_end = peer->portno_end;
1573 struct peer_req *pr;
1576 struct xseg_request *received;
1577 xseg_prepare_wait(xseg, portno_start);
1579 XSEGLOG2(&lc, D, "Attempting to check for reply");
1583 for (i = portno_start; i <= portno_end; i++) {
1584 received = xseg_receive(xseg, i, 0);
1587 r = xseg_get_req_data(xseg, received, (void **) &pr);
1588 if (r < 0 || !pr || received != expected_req){
1589 XSEGLOG2(&lc, W, "Received request with no pr data\n");
1590 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
1592 XSEGLOG2(&lc, W, "Could not respond stale request");
1593 xseg_put_request(xseg, received, portno_start);
1596 xseg_signal(xseg, p);
1599 xseg_cancel_wait(xseg, portno_start);
1605 xseg_wait_signal(xseg, peer->sd, 1000000UL);
1610 void custom_peer_finalize(struct peerd *peer)
1612 struct mapperd *mapper = __get_mapperd(peer);
1613 struct peer_req *pr = alloc_peer_req(peer);
1615 XSEGLOG2(&lc, E, "Cannot get peer request");
1619 struct xseg_request *req;
1622 xhash_iter_init(mapper->hashmaps, &it);
1623 while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
1624 map = (struct map *)val;
1625 if (!(map->state & MF_MAP_EXCLUSIVE))
1627 req = __close_map(pr, map);
1630 wait_reply(peer, req);
1631 if (!(req->state & XS_SERVED))
1632 XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
1633 map->state &= ~MF_MAP_CLOSING;
1634 put_request(pr, req);
1641 void print_obj(struct map_node *mn)
1643 fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n",
1644 (unsigned long long) mn->objectidx, mn->object,
1645 (unsigned int) mn->objectlen,
1646 (mn->flags & MF_OBJECT_WRITABLE) ? 'y' : 'n');
1649 void print_map(struct map *m)
1651 uint64_t nr_objs = m->size/MAPPER_DEFAULT_BLOCKSIZE;
1652 if (m->size % MAPPER_DEFAULT_BLOCKSIZE)
1654 fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n",
1655 m->volume, m->volumelen,
1656 (unsigned long long) m->size,
1657 (unsigned long long) nr_objs,
1660 struct map_node *mn;
1661 if (nr_objs > 1000000) //FIXME to protect against invalid volume size
1663 for (i = 0; i < nr_objs; i++) {
1664 mn = find_object(m, i);
1666 printf("object idx [%llu] not found!\n", (unsigned long long) i);