Strip xseg stuff
[archipelago] / xseg / mapper.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <stdio.h>
36 #include <unistd.h>
37 #include <sys/types.h>
38 #include <pthread.h>
39 #include <xseg/xseg.h>
40 #include <peer.h>
41 #include <time.h>
42 #include <xtypes/xhash.h>
43 #include <xseg/protocol.h>
44 //#include <sys/stat.h>
45 //#include <fcntl.h>
46 #include <errno.h>
47 #include <sched.h>
48 #include <sys/syscall.h>
49 #include <hash.h>
50 #include <mapper.h>
51 #include <mapper-versions.h>
52
53 uint64_t cur_count = 0;
54
55 extern st_cond_t req_cond;
56 /* pithos considers this a block full of zeros, so should we.
57  * it is actually the sha256 hash of nothing.
58  */
59 char *zero_block="e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
60
61 void custom_peer_usage()
62 {
63         fprintf(stderr, "Custom peer options: \n"
64                         "-bp  : port for block blocker(!)\n"
65                         "-mbp : port for map blocker\n"
66                         "\n");
67 }
68
69
70 /*
71  * Helper functions
72  */
73
74 static uint32_t calc_nr_obj(struct xseg_request *req)
75 {
76         unsigned int r = 1;
77         uint64_t rem_size = req->size;
78         uint64_t obj_offset = req->offset & (MAPPER_DEFAULT_BLOCKSIZE -1); //modulo
79         uint64_t obj_size =  (rem_size + obj_offset > MAPPER_DEFAULT_BLOCKSIZE) ? MAPPER_DEFAULT_BLOCKSIZE - obj_offset : rem_size;
80         rem_size -= obj_size;
81         while (rem_size > 0) {
82                 obj_size = (rem_size > MAPPER_DEFAULT_BLOCKSIZE) ? MAPPER_DEFAULT_BLOCKSIZE : rem_size;
83                 rem_size -= obj_size;
84                 r++;
85         }
86
87         return r;
88 }
89
90 /*
91  * Map cache handling functions
92  */
93
94 static struct map * find_map(struct mapperd *mapper, char *volume)
95 {
96         struct map *m = NULL;
97         int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
98                                 (xhashidx *) &m);
99         if (r < 0)
100                 return NULL;
101         return m;
102 }
103
104 static struct map * find_map_len(struct mapperd *mapper, char *target,
105                                         uint32_t targetlen, uint32_t flags)
106 {
107         char buf[XSEG_MAX_TARGETLEN+1];
108
109         if (targetlen > MAX_VOLUME_LEN){
110                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
111                                         targetlen, MAX_VOLUME_LEN);
112                 return NULL;
113         }
114
115         if (flags & MF_ARCHIP){
116                 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
117                 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
118                 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
119                 targetlen += MAPPER_PREFIX_LEN;
120         }
121         else {
122                 strncpy(buf, target, targetlen);
123                 buf[targetlen] = 0;
124         }
125
126         XSEGLOG2(&lc, D, "looking up map %s, len %u",
127                         buf, targetlen);
128         return find_map(mapper, buf);
129 }
130
131
132 static int insert_map(struct mapperd *mapper, struct map *map)
133 {
134         int r = -1;
135
136         if (find_map(mapper, map->volume)){
137                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
138                 goto out;
139         }
140
141         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
142                         map->volume, strlen(map->volume), (unsigned long) map);
143         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
144         while (r == -XHASH_ERESIZE) {
145                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
146                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, 0, NULL);
147                 if (!new_hashmap){
148                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
149                                         (unsigned long long) shift);
150                         goto out;
151                 }
152                 mapper->hashmaps = new_hashmap;
153                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
154         }
155 out:
156         return r;
157 }
158
159 static int remove_map(struct mapperd *mapper, struct map *map)
160 {
161         int r = -1;
162
163         //assert no pending pr on map
164
165         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
166         while (r == -XHASH_ERESIZE) {
167                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
168                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, 0, NULL);
169                 if (!new_hashmap){
170                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
171                                         (unsigned long long) shift);
172                         goto out;
173                 }
174                 mapper->hashmaps = new_hashmap;
175                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
176         }
177 out:
178         return r;
179 }
180
181 inline struct map_node * get_mapnode(struct map *map, uint64_t index)
182 {
183         struct map_node *mn;
184         if (index >= map->nr_objs) {
185         //      XSEGLOG2(&lc, E, "Index out of range: %llu > %llu",
186         //                      index, map->nr_objs);
187                 return NULL;
188         }
189         if (!map->objects) {
190         //      XSEGLOG2(&lc, E, "Map %s has no objects", map->volume);
191                 return NULL;
192         }
193         mn = &map->objects[index];
194         mn->ref++;
195         XSEGLOG2(&lc, D,  "mapnode %p: ref: %u", mn, mn->ref);
196         return mn;
197 }
198
199 inline void put_mapnode(struct map_node *mn)
200 {
201         mn->ref--;
202         XSEGLOG2(&lc, D, "mapnode %p: ref: %u", mn, mn->ref);
203         if (!mn->ref){
204                 //clean up mn
205                 st_cond_destroy(mn->cond);
206         }
207 }
208
209 int initialize_map_objects(struct map *map)
210 {
211         uint64_t i;
212         struct map_node *map_node = map->objects;
213
214         if (!map_node)
215                 return -1;
216
217         for (i = 0; i < map->nr_objs; i++) {
218                 map_node[i].map = map;
219                 map_node[i].objectidx = i;
220                 map_node[i].waiters = 0;
221                 map_node[i].state = 0;
222                 map_node[i].ref = 1;
223                 map_node[i].cond = st_cond_new(); //FIXME err check;
224         }
225         return 0;
226 }
227
228
229
230 static inline void __get_map(struct map *map)
231 {
232         map->ref++;
233 }
234
235 static inline void put_map(struct map *map)
236 {
237         struct map_node *mn;
238         XSEGLOG2(&lc, D, "Putting map %lx %s. ref %u", map, map->volume, map->ref);
239         map->ref--;
240         if (!map->ref){
241                 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
242                 /*
243                  * Check that every object is not used by another state thread.
244                  * This should always check out, otherwise there is a bug. Since
245                  * before a thread can manipulate an object, it must first get
246                  * the map, the map ref will never hit zero, while another
247                  * thread is using an object.
248                  */
249                 uint64_t i;
250                 for (i = 0; i < map->nr_objs; i++) {
251                         mn = get_mapnode(map, i);
252                         if (mn) {
253                                 //make sure all pending operations on all objects are completed
254                                 if (mn->state & MF_OBJECT_NOT_READY) {
255                                         XSEGLOG2(&lc, E, "BUG: map node in use while freeing map");
256                                         wait_on_mapnode(mn, mn->state & MF_OBJECT_NOT_READY);
257                                 }
258 //                              mn->state |= MF_OBJECT_DESTROYED;
259                                 put_mapnode(mn); //matchin mn->ref = 1 on mn init
260                                 put_mapnode(mn); //matcing get_mapnode;
261                                 //assert mn->ref == 0;
262                                 if (mn->ref) {
263                                         XSEGLOG2(&lc, E, "BUG: map node ref != 0 after final put");
264                                 }
265                         }
266                 }
267                 //clean up map
268                 if (map->objects)
269                         free(map->objects);
270                 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
271                 free(map);
272         }
273 }
274
275 static struct map * create_map(char *name, uint32_t namelen, uint32_t flags)
276 {
277         if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
278                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
279                                         namelen, MAX_VOLUME_LEN);
280                 return NULL;
281         }
282         struct map *m = malloc(sizeof(struct map));
283         if (!m){
284                 XSEGLOG2(&lc, E, "Cannot allocate map ");
285                 return NULL;
286         }
287         m->size = -1;
288         if (flags & MF_ARCHIP){
289                 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
290                 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
291                 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
292                 m->volumelen = MAPPER_PREFIX_LEN + namelen;
293                 /* Use the latest map version here, when creating a new map. If
294                  * the map is read from storage, this version will be rewritten
295                  * with the right value.
296                  */
297                 m->version = MAP_LATEST_VERSION;
298                 m->flags = 0;
299         }
300         else {
301                 strncpy(m->volume, name, namelen);
302                 m->volume[namelen] = 0;
303                 m->volumelen = namelen;
304                 m->version = 0; /* version 0 should be pithos maps */
305                 m->flags = MF_MAP_READONLY;
306         }
307         m->epoch = 0;
308         m->state = 0;
309         m->nr_objs = 0;
310         m->objects = NULL;
311         m->ref = 1;
312         m->waiters = 0;
313         m->cond = st_cond_new(); //FIXME err check;
314
315         m->users = 0;
316         m->waiters_users = 0;
317         m->users_cond = st_cond_new();
318
319         return m;
320 }
321
322 static void wait_all_map_objects_ready(struct map *map)
323 {
324         uint64_t i;
325         struct map_node *mn;
326
327         //TODO: maybe add counter on the map on how many objects are used, to
328         //speed up the common case, where there are no used objects.
329         map->state |= MF_MAP_SERIALIZING;
330         if (map->users)
331                 wait_all_objects_ready(map);
332
333         for (i = 0; i < map->nr_objs; i++) {
334                 mn = get_mapnode(map, i);
335                 if (mn) {
336                         //make sure all pending operations on all objects are completed
337                         if (mn->state & MF_OBJECT_NOT_READY) {
338                                 XSEGLOG2(&lc, E, "BUG: Map node %x of map %s, "
339                                                 "idx: %llu is not ready",
340                                                 mn, map->volume, i);
341 //                              wait_on_mapnode(mn, mn->state & MF_OBJECT_NOT_READY);
342                         }
343                         put_mapnode(mn);
344                 }
345         }
346
347         map->state &= ~MF_MAP_SERIALIZING;
348 }
349
350
351 struct r2o {
352         struct map_node *mn;
353         uint64_t offset;
354         uint64_t size;
355 };
356
357 static int do_copyups(struct peer_req *pr, struct r2o *mns, int n)
358 {
359         struct mapper_io *mio = __get_mapper_io(pr);
360         struct map_node *mn;
361         int i, j, can_wait = 0;
362         mio->pending_reqs = 0;
363         mio->cb=copyup_cb;
364         mio->err = 0;
365
366         /* do a first scan and issue as many copyups as we can.
367          * then retry and wait when an object is not ready.
368          * this could be done better, since now we wait also on the
369          * pending copyups
370          */
371         for (j = 0; j < 2 && !mio->err; j++) {
372                 for (i = 0; i < n && !mio->err; i++) {
373                         mn = mns[i].mn;
374                         //do copyups
375                         if (mn->state & MF_OBJECT_NOT_READY){
376                                 if (!can_wait)
377                                         continue;
378                                 /* here mn->flags should be
379                                  * MF_OBJECT_COPYING or MF_OBJECT_WRITING or
380                                  * later MF_OBJECT_HASHING.
381                                  * Otherwise it's a bug.
382                                  */
383                                 if (mn->state != MF_OBJECT_COPYING
384                                                 && mn->state != MF_OBJECT_WRITING) {
385                                         XSEGLOG2(&lc, E, "BUG: Map node has wrong state");
386                                 }
387                                 wait_on_mapnode(mn, mn->state & MF_OBJECT_NOT_READY);
388 //                              if (mn->state & MF_OBJECT_DESTROYED){
389 //                                      mio->err = 1;
390 //                                      continue;
391 //                              }
392                         }
393
394                         if (!(mn->flags & MF_OBJECT_WRITABLE)) {
395                                 //calc new_target, copy up object
396                                 if (__copyup_object(pr, mn) == NULL){
397                                         XSEGLOG2(&lc, E, "Error in copy up object");
398                                         mio->err = 1;
399                                 } else {
400                                         mio->pending_reqs++;
401                                 }
402                         }
403
404                 }
405                 can_wait = 1;
406         }
407
408         if (mio->err){
409                 XSEGLOG2(&lc, E, "Mio->err, pending_copyups: %d", mio->pending_reqs);
410         }
411
412         if (mio->pending_reqs > 0)
413                 wait_on_pr(pr, mio->pending_reqs > 0);
414
415         return mio->err ? -1 : 0;
416 }
417
418 static int req2objs(struct peer_req *pr, struct map *map, int write)
419 {
420         int r = 0;
421         struct peerd *peer = pr->peer;
422         struct mapper_io *mio = __get_mapper_io(pr);
423         char *target = xseg_get_target(peer->xseg, pr->req);
424         uint32_t nr_objs = calc_nr_obj(pr->req);
425         uint64_t size = sizeof(struct xseg_reply_map) +
426                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
427         uint32_t idx, i;
428         uint64_t rem_size, obj_index, obj_offset, obj_size;
429         struct map_node *mn;
430         char buf[XSEG_MAX_TARGETLEN];
431         struct xseg_reply_map *reply;
432
433         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
434
435         if (pr->req->offset + pr->req->size > map->size) {
436                 XSEGLOG2(&lc, E, "Invalid offset/size: offset: %llu, "
437                                 "size: %llu, map size: %llu",
438                                 pr->req->offset, pr->req->size, map->size);
439                 return -1;
440         }
441
442         /* get map_nodes of request */
443         struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
444         if (!mns){
445                 XSEGLOG2(&lc, E, "Cannot allocate mns");
446                 return -1;
447         }
448
449         map->users++;
450
451         idx = 0;
452         rem_size = pr->req->size;
453         obj_index = pr->req->offset / MAPPER_DEFAULT_BLOCKSIZE;
454         obj_offset = pr->req->offset & (MAPPER_DEFAULT_BLOCKSIZE -1); //modulo
455         obj_size =  (obj_offset + rem_size > MAPPER_DEFAULT_BLOCKSIZE) ? MAPPER_DEFAULT_BLOCKSIZE - obj_offset : rem_size;
456         mn = get_mapnode(map, obj_index);
457         if (!mn) {
458                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n",
459                                 (unsigned long long) obj_index);
460                 r = -1;
461                 goto out;
462         }
463         mns[idx].mn = mn;
464         mns[idx].offset = obj_offset;
465         mns[idx].size = obj_size;
466         rem_size -= obj_size;
467         while (rem_size > 0) {
468                 idx++;
469                 obj_index++;
470                 obj_offset = 0;
471                 obj_size = (rem_size >  MAPPER_DEFAULT_BLOCKSIZE) ? MAPPER_DEFAULT_BLOCKSIZE : rem_size;
472                 rem_size -= obj_size;
473                 mn = get_mapnode(map, obj_index);
474                 if (!mn) {
475                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
476                         r = -1;
477                         goto out;
478                 }
479                 mns[idx].mn = mn;
480                 mns[idx].offset = obj_offset;
481                 mns[idx].size = obj_size;
482         }
483         if (write) {
484                 if (do_copyups(pr, mns, idx+1) < 0) {
485                         r = -1;
486                         XSEGLOG2(&lc, E, "do_copyups failed");
487                         goto out;
488                 }
489         }
490
491         /* resize request to fit reply */
492         strncpy(buf, target, pr->req->targetlen);
493         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
494         if (r < 0) {
495                 XSEGLOG2(&lc, E, "Cannot resize request");
496                 goto out;
497         }
498         target = xseg_get_target(peer->xseg, pr->req);
499         strncpy(target, buf, pr->req->targetlen);
500
501         /* structure reply */
502         reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
503         reply->cnt = nr_objs;
504         for (i = 0; i < (idx+1); i++) {
505                 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
506                 reply->segs[i].targetlen = mns[i].mn->objectlen;
507                 reply->segs[i].offset = mns[i].offset;
508                 reply->segs[i].size = mns[i].size;
509         }
510 out:
511         for (i = 0; i < (idx+1); i++) {
512                 put_mapnode(mns[i].mn);
513         }
514         free(mns);
515         mio->cb = NULL;
516         if (--map->users){
517                 signal_all_objects_ready(map);
518         }
519         return r;
520 }
521
522 static int do_info(struct peer_req *pr, struct map *map)
523 {
524         struct peerd *peer = pr->peer;
525         struct xseg_reply_info *xinfo;
526         struct xseg_request *req = pr->req;
527         char buf[XSEG_MAX_TARGETLEN + 1];
528         char *target;
529         int r;
530
531         if (req->datalen < sizeof(struct xseg_reply_info)) {
532                 target = xseg_get_target(peer->xseg, req);
533                 strncpy(buf, target, req->targetlen);
534                 r = xseg_resize_request(peer->xseg, req, req->targetlen, sizeof(struct xseg_reply_info));
535                 if (r < 0) {
536                         XSEGLOG2(&lc, E, "Cannot resize request");
537                         return -1;
538                 }
539                 target = xseg_get_target(peer->xseg, req);
540                 strncpy(target, buf, req->targetlen);
541         }
542
543         xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, req);
544         xinfo->size = map->size;
545         return 0;
546 }
547
548
549 static int do_open(struct peer_req *pr, struct map *map)
550 {
551         if (map->state & MF_MAP_EXCLUSIVE) {
552                 return 0;
553         }
554         else {
555                 return -1;
556         }
557 }
558
559
560 static int dropcache(struct peer_req *pr, struct map *map)
561 {
562         int r;
563         struct peerd *peer = pr->peer;
564         struct mapperd *mapper = __get_mapperd(peer);
565         XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
566         /*
567          * We can lazily drop the cache from here, by just removing from the maps
568          * hashmap making it inaccessible from future requests. This is because:
569          *
570          * a) Dropping cache for a map is serialized on a map level. So there
571          * should not be any other threds modifying the struct map.
572          *
573          * b) Any other thread manipulating the map nodes should not have
574          * any pending requests on the map node, if the map is not opened
575          * exclusively. If that's the case, then we should not close the map,
576          * a.k.a. releasing the map lock without checking for any pending
577          * requests. Furthermore, since each operation on a map gets a map
578          * reference, the memory will not be freed, unless every request has
579          * finished processing the map.
580          */
581
582         /* Set map as destroyed to notify any waiters that hold a reference to
583          * the struct map.
584          */
585         //FIXME err check
586         r = remove_map(mapper, map);
587         if (r < 0) {
588                 XSEGLOG2(&lc, E, "Remove map %s from hashmap failed", map->volume);
589                 XSEGLOG2(&lc, E, "Dropping cache for map %s failed", map->volume);
590                 return -1;
591         }
592         map->state |= MF_MAP_DESTROYED;
593         XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
594         put_map(map);   // put map here to destroy it (matches m->ref = 1 on map create)
595         return 0;
596 }
597
598 static int do_close(struct peer_req *pr, struct map *map)
599 {
600         if (!(map->state & MF_MAP_EXCLUSIVE)) {
601                 XSEGLOG2(&lc, E, "Attempted to close a not opened map");
602                 return -1;
603         }
604
605         /* Do not close the map while there are pending requests on the
606          * map nodes.
607          */
608         wait_all_map_objects_ready(map);
609         if (close_map(pr, map) < 0) {
610                 return -1;
611         }
612
613         return 0;
614 }
615
616 static int do_hash(struct peer_req *pr, struct map *map)
617 {
618         int r;
619         struct peerd *peer = pr->peer;
620         uint64_t i, bufsize;
621         struct map *hashed_map;
622         unsigned char sha[SHA256_DIGEST_SIZE];
623         unsigned char *buf = NULL;
624         char newvolumename[MAX_VOLUME_LEN];
625         uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
626         uint64_t pos = 0;
627         char targetbuf[XSEG_MAX_TARGETLEN];
628         char *target;
629         struct xseg_reply_hash *xreply;
630         struct map_node *mn;
631
632         if (!(map->flags & MF_MAP_READONLY)) {
633                 XSEGLOG2(&lc, E, "Cannot hash live volumes");
634                 return -1;
635         }
636
637         XSEGLOG2(&lc, I, "Hashing map %s", map->volume);
638         /* prepare hashed_map holder */
639         hashed_map = create_map("", 0, 0);
640         if (!hashed_map) {
641                 XSEGLOG2(&lc, E, "Cannot create hashed map");
642                 return -1;
643         }
644
645         /* set map metadata */
646         hashed_map->size = map->size;
647         hashed_map->nr_objs = map->nr_objs;
648         hashed_map->flags = MF_MAP_READONLY;
649         hashed_map->blocksize = MAPPER_DEFAULT_BLOCKSIZE; /* FIXME, this should be PITHOS_BLOCK_SIZE right? */
650
651         hashed_map->objects = calloc(map->nr_objs, sizeof(struct map_node));
652         if (!hashed_map->objects) {
653                 XSEGLOG2(&lc, E, "Cannot allocate memory for %llu nr_objs",
654                                 hashed_map->nr_objs);
655                 r = -1;
656                 goto out;
657         }
658
659         r = initialize_map_objects(hashed_map);
660         if (r < 0) {
661                 XSEGLOG2(&lc, E, "Cannot initialize hashed_map objects");
662                 goto out;
663         }
664
665         r = hash_map(pr, map, hashed_map);
666         if (r < 0) {
667                 XSEGLOG2(&lc, E, "Cannot hash map %s", map->volume);
668                 goto out;
669         }
670
671         bufsize = hashed_map->nr_objs * v0_objectsize_in_map;
672
673         buf = malloc(bufsize);
674         if (!buf) {
675                 XSEGLOG2(&lc, E, "Cannot allocate merkle_hash buffer of %llu bytes",
676                                 bufsize);
677                 goto out;
678         }
679         for (i = 0; i < hashed_map->nr_objs; i++) {
680                 mn = get_mapnode(hashed_map, i);
681                 if (!mn){
682                         XSEGLOG2(&lc, E, "Cannot get object %llu for map %s",
683                                         i, hashed_map->volume);
684                         goto out;
685                 }
686                 map_functions[0].object_to_map(buf+pos, mn);
687                 pos += v0_objectsize_in_map;
688                 put_mapnode(mn);
689         }
690
691         merkle_hash(buf, pos, sha);
692         hexlify(sha, SHA256_DIGEST_SIZE, newvolumename);
693         strncpy(hashed_map->volume, newvolumename, newvolumenamelen);
694         hashed_map->volume[newvolumenamelen] = 0;
695         hashed_map->volumelen = newvolumenamelen;
696
697         /* write the hashed_map */
698         r = write_map(pr, hashed_map);
699         if (r < 0) {
700                 XSEGLOG2(&lc, E, "Cannot write hashed_map %s", hashed_map->volume);
701                 goto out;
702         }
703
704         /* Resize request to fit xhash reply */
705         target = xseg_get_target(peer->xseg, pr->req);
706         strncpy(targetbuf, target, pr->req->targetlen);
707
708         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
709                         sizeof(struct xseg_reply_hash));
710         if (r < 0){
711                 XSEGLOG2(&lc, E, "Cannot resize request");
712                 goto out;
713         }
714
715         target = xseg_get_target(peer->xseg, pr->req);
716         strncpy(target, targetbuf, pr->req->targetlen);
717
718         /* Put the target of the hashed_map on the reply */
719         xreply = (struct xseg_reply_hash *) xseg_get_data(peer->xseg, pr->req);
720         strncpy(xreply->target, newvolumename, newvolumenamelen);
721         xreply->targetlen = newvolumenamelen;
722
723 out:
724         if (buf)
725                 free(buf);
726         put_map(hashed_map);
727         if (r < 0) {
728                 return -1;
729         } else {
730                 return 0;
731         }
732 }
733
734 static int do_snapshot(struct peer_req *pr, struct map *map)
735 {
736         uint64_t i;
737         struct peerd *peer = pr->peer;
738         //struct mapper_io *mio = __get_mapper_io(pr);
739         struct map_node *mn;
740         uint64_t nr_objs;
741         struct map *snap_map;
742         struct xseg_request_snapshot *xsnapshot;
743         char *snapname;
744         uint32_t snapnamelen;
745         int r;
746
747         xsnapshot = (struct xseg_request_snapshot *)xseg_get_data(peer->xseg, pr->req);
748         if (!xsnapshot) {
749                 return -1;
750         }
751         snapname = xsnapshot->target;
752         snapnamelen = xsnapshot->targetlen;
753
754         if (!snapnamelen) {
755                 XSEGLOG2(&lc, E, "Snapshot name must be provided");
756                 return -1;
757         }
758
759         if (!(map->state & MF_MAP_EXCLUSIVE)) {
760                 XSEGLOG2(&lc, E, "Map was not opened exclusively");
761                 return -1;
762         }
763         if (map->epoch == UINT64_MAX) {
764                 XSEGLOG2(&lc, E, "Max epoch reached for %s", map->volume);
765                 return -1;
766         }
767         XSEGLOG2(&lc, I, "Starting snapshot for map %s", map->volume);
768         map->state |= MF_MAP_SNAPSHOTTING;
769
770         //create new map struct with name snapshot name and flag readonly.
771         snap_map = create_map(snapname, snapnamelen, MF_ARCHIP);
772         if (!snap_map) {
773                 goto out_err;
774         }
775
776         //open/load map to check if snap exists
777         r = open_map(pr, snap_map, 0);
778         if (r < 0) {
779                 XSEGLOG2(&lc, E, "Could not open snap map");
780                 XSEGLOG2(&lc, E, "Snapshot exists");
781                 goto out_put;
782         }
783         r = load_map_metadata(pr, snap_map);
784         if (r >= 0 && !(map->flags & MF_MAP_DELETED)) {
785                 XSEGLOG2(&lc, E, "Snapshot exists");
786                 goto out_close;
787         }
788         snap_map->epoch = 0;
789         //snap_map->flags &= ~MF_MAP_DELETED;
790         snap_map->flags = MF_MAP_READONLY;
791         snap_map->objects = map->objects;
792         snap_map->size = map->size;
793         snap_map->blocksize = map->blocksize;
794         snap_map->nr_objs = map->nr_objs;
795
796
797         nr_objs = map->nr_objs;
798
799         //set all map_nodes read only;
800         //TODO, maybe skip that check and add an epoch number on each object.
801         //Then we can check if object is writable iff object epoch == map epoch
802         wait_all_map_objects_ready(map);
803         for (i = 0; i < nr_objs; i++) {
804                 mn = get_mapnode(map, i);
805                 if (!mn) {
806                         XSEGLOG2(&lc, E, "Could not get map node %llu for map %s",
807                                         i, map->volume);
808                         goto out_err;
809                 }
810
811                 // make sure all pending operations on all objects are completed
812                 // Basically make sure, that no previously copy up operation,
813                 // will mess with our state.
814                 // This works, since only a map_w, that was processed before
815                 // this request, can have issued an object write request which
816                 // may be pending. Since the objects are processed in the same
817                 // order by the copyup operation and the snapshot operation, we
818                 // can be sure, that no previously ready objects, have changed
819                 // their state into not read.
820                 // No other operation that manipulated map objects can occur
821                 // simutaneously with snapshot operation.
822                 if (mn->state & MF_OBJECT_NOT_READY)
823                         XSEGLOG2(&lc, E, "BUG: object not ready");
824         //              wait_on_mapnode(mn, mn->state & MF_OBJECT_NOT_READY);
825
826                 mn->flags &= ~MF_OBJECT_WRITABLE;
827                 put_mapnode(mn);
828         }
829         //increase epoch
830         map->epoch++;
831         //write map
832         r = write_map(pr, map);
833         if (r < 0) {
834                 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
835                 /* Not restoring epoch or writable status here, is not
836                  * devastating, since this is not the common case, and it can
837                  * only cause unneeded copy-on-write operations.
838                  */
839                 goto out_err;
840         }
841         //write snapshot map
842         r = write_map(pr, snap_map);
843         if (r < 0) {
844                 XSEGLOG2(&lc, E, "Write of snapshot map failed");
845                 goto out_unset;
846         }
847
848         close_map(pr, snap_map);
849         snap_map->objects = NULL;
850         put_map(snap_map);
851
852         map->state &= ~MF_MAP_SNAPSHOTTING;
853
854         if (map->opened_count == cur_count)
855                 close_map(pr, map);
856
857         XSEGLOG2(&lc, I, "Snapshot for map %s completed", map->volume);
858         return 0;
859
860 out_unset:
861         snap_map->objects = NULL;
862 out_close:
863         close_map(pr, snap_map);
864 out_put:
865         put_map(snap_map);
866 out_err:
867         map->state &= ~MF_MAP_SNAPSHOTTING;
868         XSEGLOG2(&lc, E, "Snapshot for map %s failed", map->volume);
869         return -1;
870 }
871
872 /* This should probably me a map function */
873 static int do_destroy(struct peer_req *pr, struct map *map)
874 {
875         //uint64_t i, nr_obj;
876         //struct peerd *peer = pr->peer;
877         //struct mapper_io *mio = __get_mapper_io(pr);
878         //struct map_node *mn;
879         //struct xseg_request *req;
880         int r;
881
882         if (!(map->state & MF_MAP_EXCLUSIVE))
883                 return -1;
884
885         if (map->flags & MF_MAP_DELETED) {
886                 XSEGLOG2(&lc, E, "Map %s already deleted", map->volume);
887                 do_close(pr, map);
888                 return -1;
889         }
890
891         XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
892         map->state |= MF_MAP_DELETING;
893         map->flags |= MF_MAP_DELETED;
894         /* Just write map here. Only thing that matters are the map flags, which
895          * will not be overwritten by any other concurrent map write which can
896          * be caused by a copy up. Also if by any chance, the volume is
897          * recreated and there are pending copy ups from the old node, they will
898          * not mess with the new one. So let's just be fast.
899          */
900         /* we could write only metadata here to speed things up*/
901         /* Also, we could delete/truncate the unnecessary map blocks, aka all but
902          * metadata, but that would require to make sure there are no pending
903          * operations on any block, aka wait_all_map_objects_ready(). Or we can do
904          * it later, with garbage collection.
905          */
906         r = write_map_metadata(pr, map);
907         if (r < 0){
908                 map->state &= ~MF_MAP_DELETING;
909                 XSEGLOG2(&lc, E, "Failed to destroy map %s", map->volume);
910                 return -1;
911         }
912
913         map->state &= ~MF_MAP_DELETING;
914         XSEGLOG2(&lc, I, "Deleted map %s", map->volume);
915         /* do close will drop the map from cache  */
916
917         do_close(pr, map);
918         /* if do_close fails, an error message will be logged, but the deletion
919          * was successfull, and there isn't much to do about the error.
920          */
921         return 0;
922 }
923
924 static int do_mapr(struct peer_req *pr, struct map *map)
925 {
926         struct peerd *peer = pr->peer;
927         int r = req2objs(pr, map, 0);
928         if  (r < 0){
929                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
930                                 map->volume, 
931                                 (unsigned long long) pr->req->offset, 
932                                 (unsigned long long) (pr->req->offset + pr->req->size));
933                 return -1;
934         }
935         XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
936                         map->volume, 
937                         (unsigned long long) pr->req->offset, 
938                         (unsigned long long) (pr->req->offset + pr->req->size));
939         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
940                         (unsigned long long) pr->req->offset,
941                         (unsigned long long) pr->req->size);
942         char buf[XSEG_MAX_TARGETLEN+1];
943         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
944         int i;
945         for (i = 0; i < reply->cnt; i++) {
946                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
947                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
948                 buf[reply->segs[i].targetlen] = 0;
949                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
950                                 (unsigned long long) reply->segs[i].offset,
951                                 (unsigned long long) reply->segs[i].size);
952         }
953         return 0;
954 }
955
956 static int do_mapw(struct peer_req *pr, struct map *map)
957 {
958         struct peerd *peer = pr->peer;
959         int r;
960         if (map->flags & MF_MAP_READONLY) {
961                 XSEGLOG2(&lc, E, "Cannot write to a read only map");
962                 return -1;
963         }
964         r = req2objs(pr, map, 1);
965         if  (r < 0){
966                 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
967                                 map->volume, 
968                                 (unsigned long long) pr->req->offset, 
969                                 (unsigned long long) (pr->req->offset + pr->req->size));
970                 return -1;
971         }
972         XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
973                         map->volume, 
974                         (unsigned long long) pr->req->offset, 
975                         (unsigned long long) (pr->req->offset + pr->req->size));
976         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
977                         (unsigned long long) pr->req->offset,
978                         (unsigned long long) pr->req->size);
979         char buf[XSEG_MAX_TARGETLEN+1];
980         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
981         int i;
982         for (i = 0; i < reply->cnt; i++) {
983                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
984                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
985                 buf[reply->segs[i].targetlen] = 0;
986                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
987                                 (unsigned long long) reply->segs[i].offset,
988                                 (unsigned long long) reply->segs[i].size);
989         }
990         return 0;
991 }
992
993 //here map is the parent map
994 static int do_clone(struct peer_req *pr, struct map *map)
995 {
996         long i, c;
997         int r;
998         struct peerd *peer = pr->peer;
999         //struct mapperd *mapper = __get_mapperd(peer);
1000         char *target = xseg_get_target(peer->xseg, pr->req);
1001         struct map *clonemap;
1002         struct map_node *map_nodes, *mn;
1003         struct xseg_request_clone *xclone =
1004                 (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1005
1006         if (!(map->flags & MF_MAP_READONLY)) {
1007                 XSEGLOG2(&lc, E, "Cloning is supported only from a snapshot");
1008                 return -1;
1009         }
1010
1011         XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1012         clonemap = create_map(target, pr->req->targetlen, MF_ARCHIP);
1013         if (!clonemap) {
1014                 XSEGLOG2(&lc, E, "Create map %s failed");
1015                 return -1;
1016         }
1017
1018         /* open map to get exclusive access to map */
1019         r = open_map(pr, clonemap, 0);
1020         if (r < 0){
1021                 XSEGLOG2(&lc, E, "Cannot open map %s", clonemap->volume);
1022                 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
1023                 goto out_put;
1024         }
1025         r = load_map_metadata(pr, clonemap);
1026         if (r >= 0 && !(clonemap->flags & MF_MAP_DELETED)) {
1027                 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
1028                 goto out_close;
1029         }
1030
1031         /* Make sure, we can take at least one snapshot of the new volume */
1032         if (map->epoch >= UINT64_MAX - 2) {
1033                 XSEGLOG2(&lc, E, "Max epoch reached for %s", clonemap->volume);
1034                 goto out_close;
1035         }
1036         clonemap->flags &= ~MF_MAP_DELETED;
1037         clonemap->epoch++;
1038
1039         if (!(xclone->size))
1040                 clonemap->size = map->size;
1041         else
1042                 clonemap->size = xclone->size;
1043         if (clonemap->size < map->size){
1044                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1045                                 "\n\t for requested clone %s",
1046                                 (unsigned long long) clonemap->size,
1047                                 (unsigned long long) map->size, clonemap->volume);
1048                 goto out_close;
1049         }
1050
1051         clonemap->blocksize = MAPPER_DEFAULT_BLOCKSIZE;
1052         //alloc and init map_nodes
1053         c = calc_map_obj(clonemap);
1054         map_nodes = calloc(c, sizeof(struct map_node));
1055         if (!map_nodes){
1056                 goto out_close;
1057         }
1058         clonemap->objects = map_nodes;
1059         clonemap->nr_objs = c;
1060         for (i = 0; i < c; i++) {
1061                 mn = get_mapnode(map, i);
1062                 if (mn) {
1063                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1064                         map_nodes[i].objectlen = mn->objectlen;
1065                         map_nodes[i].flags = 0;
1066                         if (mn->flags & MF_OBJECT_ARCHIP)
1067                                 map_nodes[i].flags |= MF_OBJECT_ARCHIP;
1068                         if (mn->flags & MF_OBJECT_ZERO)
1069                                 map_nodes[i].flags |= MF_OBJECT_ZERO;
1070                         put_mapnode(mn);
1071                 } else {
1072                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1073                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1074                         map_nodes[i].flags = MF_OBJECT_ZERO;
1075                 }
1076                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1077                 map_nodes[i].state = 0;
1078                 map_nodes[i].objectidx = i;
1079                 map_nodes[i].map = clonemap;
1080                 map_nodes[i].ref = 1;
1081                 map_nodes[i].waiters = 0;
1082                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1083         }
1084
1085         r = write_map(pr, clonemap);
1086         if (r < 0){
1087                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1088                 goto out_close;
1089         }
1090
1091         XSEGLOG2(&lc, I, "Cloning map %s to %s completed",
1092                         map->volume, clonemap->volume);
1093         close_map(pr, clonemap);
1094         put_map(clonemap);
1095         return 0;
1096
1097 out_close:
1098         close_map(pr, clonemap);
1099 out_put:
1100         put_map(clonemap);
1101         return -1;
1102 }
1103
1104 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1105 {
1106         int r, opened = 0;
1107         if (flags & MF_EXCLUSIVE){
1108                 r = open_map(pr, map, flags);
1109                 if (r < 0) {
1110                         if (flags & MF_FORCE){
1111                                 return -1;
1112                         }
1113                 } else {
1114                         opened = 1;
1115                 }
1116         }
1117         r = load_map(pr, map);
1118         if (r < 0 && opened){
1119                 close_map(pr, map);
1120         }
1121         return r;
1122 }
1123
1124 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
1125                         uint32_t flags)
1126 {
1127         int r;
1128         struct peerd *peer = pr->peer;
1129         struct mapperd *mapper = __get_mapperd(peer);
1130         struct map *map = find_map_len(mapper, name, namelen, flags);
1131         if (!map) {
1132                 if (flags & MF_LOAD){
1133                         map = create_map(name, namelen, flags);
1134                         if (!map)
1135                                 return NULL;
1136                         r = insert_map(mapper, map);
1137                         if (r < 0){
1138                                 XSEGLOG2(&lc, E, "Cannot insert map %s", map->volume);
1139                                 put_map(map);
1140                         }
1141                         __get_map(map);
1142                         r = open_load_map(pr, map, flags);
1143                         if (r < 0){
1144                                 dropcache(pr, map);
1145                                 /* signal map here, so any other threads that
1146                                  * tried to get the map, but couldn't because
1147                                  * of the opening or loading operation that
1148                                  * failed, can continue.
1149                                  */
1150                                 signal_map(map);
1151                                 put_map(map);
1152                                 return NULL;
1153                         }
1154                         /* If the map is deleted, drop everything and return
1155                          * NULL.
1156                          */
1157                         if (map->flags & MF_MAP_DELETED){
1158                                 XSEGLOG2(&lc, E, "Loaded deleted map %s. Failing...",
1159                                                 map->volume);
1160                                 do_close(pr, map);
1161                                 dropcache(pr, map);
1162                                 signal_map(map);
1163                                 put_map(map);
1164                                 return NULL;
1165                         }
1166                         return map;
1167                 } else {
1168                         return NULL;
1169                 }
1170         } else {
1171                 __get_map(map);
1172         }
1173         return map;
1174
1175 }
1176
1177 static int map_action(int (action)(struct peer_req *pr, struct map *map),
1178                 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1179 {
1180         //struct peerd *peer = pr->peer;
1181         struct map *map;
1182 start:
1183         map = get_map(pr, name, namelen, flags);
1184         if (!map)
1185                 return -1;
1186         if (map->state & MF_MAP_NOT_READY){
1187                 wait_on_map(map, (map->state & MF_MAP_NOT_READY));
1188                 put_map(map);
1189                 goto start;
1190         }
1191         int r = action(pr, map);
1192         //always drop cache if map not read exclusively
1193         if (!(map->state & MF_MAP_EXCLUSIVE))
1194                 dropcache(pr, map);
1195         signal_map(map);
1196         put_map(map);
1197         return r;
1198 }
1199
1200 void * handle_info(struct peer_req *pr)
1201 {
1202         struct peerd *peer = pr->peer;
1203         char *target = xseg_get_target(peer->xseg, pr->req);
1204         int r = map_action(do_info, pr, target, pr->req->targetlen,
1205                                 MF_ARCHIP|MF_LOAD);
1206         if (r < 0)
1207                 fail(peer, pr);
1208         else
1209                 complete(peer, pr);
1210         ta--;
1211         return NULL;
1212 }
1213
1214 void * handle_clone(struct peer_req *pr)
1215 {
1216         int r;
1217         struct peerd *peer = pr->peer;
1218         //struct mapperd *mapper = __get_mapperd(peer);
1219         struct xseg_request_clone *xclone;
1220         xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1221         if (!xclone) {
1222                 r = -1;
1223                 goto out;
1224         }
1225
1226         if (xclone->targetlen){
1227                 /* if snap was defined */
1228                 if (pr->req->flags & XF_CONTADDR)
1229                         r = map_action(do_clone, pr, xclone->target,
1230                                         xclone->targetlen, MF_LOAD);
1231                 else
1232                         r = map_action(do_clone, pr, xclone->target,
1233                                         xclone->targetlen, MF_LOAD|MF_ARCHIP);
1234         } else {
1235                 /* else try to create a new volume */
1236                 XSEGLOG2(&lc, I, "Creating volume");
1237                 if (!xclone->size){
1238                         XSEGLOG2(&lc, E, "Cannot create volume. Size not specified");
1239                         r = -1;
1240                         goto out;
1241                 }
1242                 struct map *map;
1243                 char *target = xseg_get_target(peer->xseg, pr->req);
1244
1245                 //create a new empty map of size
1246                 map = create_map(target, pr->req->targetlen, MF_ARCHIP);
1247                 if (!map) {
1248                         r = -1;
1249                         goto out;
1250                 }
1251                 /* open map to get exclusive access to map */
1252                 r = open_map(pr, map, 0);
1253                 if (r < 0) {
1254                         XSEGLOG2(&lc, E, "Cannot open map %s", map->volume);
1255                         XSEGLOG2(&lc, E, "Target volume %s exists", map->volume);
1256                         put_map(map);
1257                         r = -1;
1258                         goto out;
1259                 }
1260                 r = load_map_metadata(pr, map);
1261                 if (r >= 0 && !(map->flags & MF_MAP_DELETED)) {
1262                         XSEGLOG2(&lc, E, "Map exists %s", map->volume);
1263                         close_map(pr, map);
1264                         put_map(map);
1265                         r = -1;
1266                         goto out;
1267                 }
1268                 if (map->epoch >= UINT64_MAX - 2) {
1269                         XSEGLOG2(&lc, E, "Max epoch reached for %s", map->volume);
1270                         close_map(pr, map);
1271                         put_map(map);
1272                         r = -1;
1273                         goto out;
1274                 }
1275                 map->epoch++;
1276                 map->flags &= ~MF_MAP_DELETED;
1277                 map->size = xclone->size;
1278                 map->blocksize = MAPPER_DEFAULT_BLOCKSIZE;
1279                 map->nr_objs = calc_map_obj(map);
1280                 uint64_t nr_objs = map->nr_objs;
1281                 //populate_map with zero objects;
1282
1283                 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1284                 if (!map_nodes){
1285                         XSEGLOG2(&lc, E, "Cannot allocate %llu nr_objs", nr_objs);
1286                         close_map(pr, map);
1287                         put_map(map);
1288                         r = -1;
1289                         goto out;
1290                 }
1291                 map->objects = map_nodes;
1292
1293                 uint64_t i;
1294                 for (i = 0; i < nr_objs; i++) {
1295                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1296                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1297                         map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1298                         map_nodes[i].flags = MF_OBJECT_ZERO ; //MF_OBJECT_ARCHIP;
1299                         map_nodes[i].state = 0;
1300                         map_nodes[i].objectidx = i;
1301                         map_nodes[i].map = map;
1302                         map_nodes[i].ref = 1;
1303                         map_nodes[i].waiters = 0;
1304                         map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1305                 }
1306                 r = write_map(pr, map);
1307                 if (r < 0){
1308                         XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1309                         close_map(pr, map);
1310                         put_map(map);
1311                         goto out;
1312                 }
1313                 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1314                 r = 0;
1315                 close_map(pr, map);
1316                 put_map(map);
1317         }
1318 out:
1319         if (r < 0)
1320                 fail(peer, pr);
1321         else
1322                 complete(peer, pr);
1323         ta--;
1324         return NULL;
1325 }
1326
1327 void * handle_mapr(struct peer_req *pr)
1328 {
1329         struct peerd *peer = pr->peer;
1330         char *target = xseg_get_target(peer->xseg, pr->req);
1331         int r = map_action(do_mapr, pr, target, pr->req->targetlen,
1332                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
1333         if (r < 0)
1334                 fail(peer, pr);
1335         else
1336                 complete(peer, pr);
1337         ta--;
1338         return NULL;
1339 }
1340
1341 void * handle_mapw(struct peer_req *pr)
1342 {
1343         struct peerd *peer = pr->peer;
1344         char *target = xseg_get_target(peer->xseg, pr->req);
1345         int r = map_action(do_mapw, pr, target, pr->req->targetlen,
1346                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1347         if (r < 0)
1348                 fail(peer, pr);
1349         else
1350                 complete(peer, pr);
1351         XSEGLOG2(&lc, D, "Ta: %d", ta);
1352         ta--;
1353         return NULL;
1354 }
1355
1356 void * handle_destroy(struct peer_req *pr)
1357 {
1358         struct peerd *peer = pr->peer;
1359         char *target = xseg_get_target(peer->xseg, pr->req);
1360         /* request EXCLUSIVE access, but do not force it.
1361          * check if succeeded on do_destroy
1362          */
1363         int r = map_action(do_destroy, pr, target, pr->req->targetlen,
1364                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
1365         if (r < 0)
1366                 fail(peer, pr);
1367         else
1368                 complete(peer, pr);
1369         ta--;
1370         return NULL;
1371 }
1372
1373 void * handle_open(struct peer_req *pr)
1374 {
1375         struct peerd *peer = pr->peer;
1376         char *target = xseg_get_target(peer->xseg, pr->req);
1377         //here we do not want to load
1378         int r = map_action(do_open, pr, target, pr->req->targetlen,
1379                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
1380         if (r < 0)
1381                 fail(peer, pr);
1382         else
1383                 complete(peer, pr);
1384         ta--;
1385         return NULL;
1386 }
1387
1388 void * handle_close(struct peer_req *pr)
1389 {
1390         struct peerd *peer = pr->peer;
1391         char *target = xseg_get_target(peer->xseg, pr->req);
1392         //here we do not want to load
1393         int r = map_action(do_close, pr, target, pr->req->targetlen,
1394                                 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
1395         if (r < 0)
1396                 fail(peer, pr);
1397         else
1398                 complete(peer, pr);
1399         ta--;
1400         return NULL;
1401 }
1402
1403 void * handle_snapshot(struct peer_req *pr)
1404 {
1405         struct peerd *peer = pr->peer;
1406         char *target = xseg_get_target(peer->xseg, pr->req);
1407         /* request EXCLUSIVE access, but do not force it.
1408          * check if succeeded on do_snapshot
1409          */
1410         int r = map_action(do_snapshot, pr, target, pr->req->targetlen,
1411                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
1412         if (r < 0)
1413                 fail(peer, pr);
1414         else
1415                 complete(peer, pr);
1416         ta--;
1417         return NULL;
1418 }
1419
1420 void * handle_hash(struct peer_req *pr)
1421 {
1422         struct peerd *peer = pr->peer;
1423         char *target = xseg_get_target(peer->xseg, pr->req);
1424         /* Do not request exclusive access. Since we are hashing only shapshots
1425          * which are read only, there is no need for locking
1426          */
1427         int r = map_action(do_hash, pr, target, pr->req->targetlen,
1428                                 MF_ARCHIP|MF_LOAD);
1429         if (r < 0)
1430                 fail(peer, pr);
1431         else
1432                 complete(peer, pr);
1433         ta--;
1434         return NULL;
1435 }
1436
1437 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
1438                         struct xseg_request *req)
1439 {
1440         //struct mapperd *mapper = __get_mapperd(peer);
1441         struct mapper_io *mio = __get_mapper_io(pr);
1442         void *(*action)(struct peer_req *) = NULL;
1443
1444         //mio->state = ACCEPTED;
1445         mio->err = 0;
1446         mio->cb = NULL;
1447         cur_count++;
1448         mio->count = cur_count;
1449         switch (pr->req->op) {
1450                 /* primary xseg operations of mapper */
1451                 case X_CLONE: action = handle_clone; break;
1452                 case X_MAPR: action = handle_mapr; break;
1453                 case X_MAPW: action = handle_mapw; break;
1454                 case X_SNAPSHOT: action = handle_snapshot; break;
1455                 case X_INFO: action = handle_info; break;
1456                 case X_DELETE: action = handle_destroy; break;
1457                 case X_OPEN: action = handle_open; break;
1458                 case X_CLOSE: action = handle_close; break;
1459                 case X_HASH: action = handle_hash; break;
1460                 default: fprintf(stderr, "mydispatch: unknown op\n"); break;
1461         }
1462         if (action){
1463                 ta++;
1464                 mio->active = 1;
1465                 st_thread_create(action, pr, 0, 0);
1466         }
1467         return 0;
1468
1469 }
1470
1471 struct cb_arg {
1472         struct peer_req *pr;
1473         struct xseg_request *req;
1474 };
1475
1476 void * callback_caller(struct cb_arg *arg)
1477 {
1478         struct peer_req *pr = arg->pr;
1479         struct xseg_request *req = arg->req;
1480         struct mapper_io *mio = __get_mapper_io(pr);
1481
1482         mio->cb(pr, req);
1483         free(arg);
1484         ta--;
1485         return NULL;
1486 }
1487
1488 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1489                 enum dispatch_reason reason)
1490 {
1491         struct mapper_io *mio = __get_mapper_io(pr);
1492         struct cb_arg *arg;
1493
1494         if (reason == dispatch_accept)
1495                 dispatch_accepted(peer, pr, req);
1496         else {
1497                 if (mio->cb){
1498 //                      mio->cb(pr, req);
1499                         arg = malloc(sizeof(struct cb_arg));
1500                         if (!arg) {
1501                                 XSEGLOG2(&lc, E, "Cannot allocate cb_arg");
1502                                 return -1;
1503                         }
1504                         arg->pr = pr;
1505                         arg->req = req;
1506                         ta++;
1507                 //      mio->active = 1;
1508                         st_thread_create(callback_caller, arg, 0, 0);
1509                 } else {
1510                         signal_pr(pr);
1511                 }
1512         }
1513         return 0;
1514 }
1515
1516 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1517 {
1518         int i;
1519
1520         //FIXME error checks
1521         struct mapperd *mapper = malloc(sizeof(struct mapperd));
1522         peer->priv = mapper;
1523         //mapper = mapperd;
1524         mapper->hashmaps = xhash_new(3, 0, STRING);
1525
1526         for (i = 0; i < peer->nr_ops; i++) {
1527                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1528                 mio->copyups_nodes = xhash_new(3, 0, INTEGER);
1529                 mio->pending_reqs = 0;
1530                 mio->err = 0;
1531                 mio->active = 0;
1532                 peer->peer_reqs[i].priv = mio;
1533         }
1534
1535         mapper->bportno = -1;
1536         mapper->mbportno = -1;
1537         BEGIN_READ_ARGS(argc, argv);
1538         READ_ARG_ULONG("-bp", mapper->bportno);
1539         READ_ARG_ULONG("-mbp", mapper->mbportno);
1540         END_READ_ARGS();
1541         if (mapper->bportno == -1){
1542                 XSEGLOG2(&lc, E, "Portno for blocker must be provided");
1543                 usage(argv[0]);
1544                 return -1;
1545         }
1546         if (mapper->mbportno == -1){
1547                 XSEGLOG2(&lc, E, "Portno for mblocker must be provided");
1548                 usage(argv[0]);
1549                 return -1;
1550         }
1551
1552         const struct sched_param param = { .sched_priority = 99 };
1553         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
1554         /* FIXME maybe place it in peer
1555          * should be done for each port (sportno to eportno)
1556          */
1557         xseg_set_max_requests(peer->xseg, peer->portno_start, 5000);
1558         xseg_set_freequeue_size(peer->xseg, peer->portno_start, 3000, 0);
1559
1560         req_cond = st_cond_new();
1561
1562 //      test_map(peer);
1563
1564         return 0;
1565 }
1566
1567 /* FIXME this should not be here */
1568 int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
1569 {
1570         struct xseg *xseg = peer->xseg;
1571         xport portno_start = peer->portno_start;
1572         xport portno_end = peer->portno_end;
1573         struct peer_req *pr;
1574         xport i;
1575         int  r, c = 0;
1576         struct xseg_request *received;
1577         xseg_prepare_wait(xseg, portno_start);
1578         while(1) {
1579                 XSEGLOG2(&lc, D, "Attempting to check for reply");
1580                 c = 1;
1581                 while (c){
1582                         c = 0;
1583                         for (i = portno_start; i <= portno_end; i++) {
1584                                 received = xseg_receive(xseg, i, 0);
1585                                 if (received) {
1586                                         c = 1;
1587                                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
1588                                         if (r < 0 || !pr || received != expected_req){
1589                                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
1590                                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
1591                                                 if (p == NoPort){
1592                                                         XSEGLOG2(&lc, W, "Could not respond stale request");
1593                                                         xseg_put_request(xseg, received, portno_start);
1594                                                         continue;
1595                                                 } else {
1596                                                         xseg_signal(xseg, p);
1597                                                 }
1598                                         } else {
1599                                                 xseg_cancel_wait(xseg, portno_start);
1600                                                 return 0;
1601                                         }
1602                                 }
1603                         }
1604                 }
1605                 xseg_wait_signal(xseg, peer->sd, 1000000UL);
1606         }
1607 }
1608
1609
1610 void custom_peer_finalize(struct peerd *peer)
1611 {
1612         struct mapperd *mapper = __get_mapperd(peer);
1613         struct peer_req *pr = alloc_peer_req(peer);
1614         if (!pr){
1615                 XSEGLOG2(&lc, E, "Cannot get peer request");
1616                 return;
1617         }
1618         struct map *map;
1619         struct xseg_request *req;
1620         xhash_iter_t it;
1621         xhashidx key, val;
1622         xhash_iter_init(mapper->hashmaps, &it);
1623         while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
1624                 map = (struct map *)val;
1625                 if (!(map->state & MF_MAP_EXCLUSIVE))
1626                         continue;
1627                 req = __close_map(pr, map);
1628                 if (!req)
1629                         continue;
1630                 wait_reply(peer, req);
1631                 if (!(req->state & XS_SERVED))
1632                         XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
1633                 map->state &= ~MF_MAP_CLOSING;
1634                 put_request(pr, req);
1635         }
1636         return;
1637
1638
1639 }
1640 /*
1641 void print_obj(struct map_node *mn)
1642 {
1643         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
1644                         (unsigned long long) mn->objectidx, mn->object, 
1645                         (unsigned int) mn->objectlen, 
1646                         (mn->flags & MF_OBJECT_WRITABLE) ? 'y' : 'n');
1647 }
1648
1649 void print_map(struct map *m)
1650 {
1651         uint64_t nr_objs = m->size/MAPPER_DEFAULT_BLOCKSIZE;
1652         if (m->size % MAPPER_DEFAULT_BLOCKSIZE)
1653                 nr_objs++;
1654         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n", 
1655                         m->volume, m->volumelen, 
1656                         (unsigned long long) m->size, 
1657                         (unsigned long long) nr_objs,
1658                         m->version);
1659         uint64_t i;
1660         struct map_node *mn;
1661         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
1662                 return;
1663         for (i = 0; i < nr_objs; i++) {
1664                 mn = find_object(m, i);
1665                 if (!mn){
1666                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
1667                         continue;
1668                 }
1669                 print_obj(mn);
1670         }
1671 }
1672 */