3ef0a40cc5bb66f1d0aa92501698734dfcc5fabc
[archipelago] / xseg / peers / user / mt-mapperd.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <stdio.h>
36 #include <unistd.h>
37 #include <sys/types.h>
38 #include <pthread.h>
39 #include <xseg/xseg.h>
40 #include <peer.h>
41 #include <time.h>
42 #include <xtypes/xlock.h>
43 #include <xtypes/xhash.h>
44 #include <xseg/protocol.h>
45 #include <sys/stat.h>
46 #include <fcntl.h>
47 #include <errno.h>
48 #include <sched.h>
49 #include <sys/syscall.h>
50 #include <openssl/sha.h>
51 #include <ctype.h>
52
53 /* general mapper flags */
54 #define MF_LOAD         (1 << 0)
55 #define MF_EXCLUSIVE    (1 << 1)
56 #define MF_FORCE        (1 << 2)
57 #define MF_ARCHIP       (1 << 3)
58
59 #ifndef SHA256_DIGEST_SIZE
60 #define SHA256_DIGEST_SIZE 32
61 #endif
62 /* hex representation of sha256 value takes up double the sha256 size */
63 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
64
65 #define block_size (1<<22) //FIXME this should be defined here?
66
67 /* transparency byte + max object len in disk */
68 #define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
69
70 /* Map header contains:
71  *      map version
72  *      volume size
73  */
74 #define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
75
76
77 #define MAPPER_PREFIX "archip_"
78 #define MAPPER_PREFIX_LEN 7
79
80 #define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
81 #define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
82
83 #if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
84 #error  "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
85 #endif
86
87 #define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
88
89 #if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
90 #error  "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
91 #endif
92
93 #define MAX_VOLUME_SIZE \
94 ((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
95
96
97 //char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
98
99 /* pithos considers this a block full of zeros, so should we.
100  * it is actually the sha256 hash of nothing.
101  */
102 char *zero_block="e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
103 #define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
104
105 /* dispatch_internal mapper states */
106 enum mapper_state {
107         ACCEPTED = 0,
108         WRITING = 1,
109         COPYING = 2,
110         DELETING = 3,
111         DROPPING_CACHE = 4
112 };
113
114 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
115
116
117 /* mapper object flags */
118 #define MF_OBJECT_EXIST         (1 << 0)
119 #define MF_OBJECT_COPYING       (1 << 1)
120 #define MF_OBJECT_WRITING       (1 << 2)
121 #define MF_OBJECT_DELETING      (1 << 3)
122 #define MF_OBJECT_DESTROYED     (1 << 5)
123 #define MF_OBJECT_SNAPSHOTTING  (1 << 6)
124
125 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
126                                 MF_OBJECT_DELETING|MF_OBJECT_SNAPSHOTTING)
127 struct map_node {
128         uint32_t flags;
129         uint32_t objectidx;
130         uint32_t objectlen;
131         char object[MAX_OBJECT_LEN + 1];        /* NULL terminated string */
132         struct map *map;
133         uint32_t ref;
134         uint32_t waiters;
135         st_cond_t cond;
136 };
137
138
139 #define wait_on_pr(__pr, __condition__)         \
140         while (__condition__){                  \
141                 ta--;                           \
142                 __get_mapper_io(pr)->active = 0;\
143                 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
144                 st_cond_wait(__pr->cond);       \
145         }
146
147 #define wait_on_mapnode(__mn, __condition__)    \
148         while (__condition__){                  \
149                 ta--;                           \
150                 __mn->waiters++;                \
151                 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
152                         ta: %u",  __mn, __mn->object, __mn->waiters, ta);  \
153                 st_cond_wait(__mn->cond);       \
154         }
155
156 #define wait_on_map(__map, __condition__)       \
157         while (__condition__){                  \
158                 ta--;                           \
159                 __map->waiters++;               \
160                 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
161                                    __map, __map->volume, __map->waiters, ta); \
162                 st_cond_wait(__map->cond);      \
163         }
164
165 #define signal_pr(__pr)                         \
166         do {                                    \
167                 if (!__get_mapper_io(pr)->active){\
168                         ta++;                   \
169                         XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta);\
170                         __get_mapper_io(pr)->active = 1;\
171                         st_cond_signal(__pr->cond);     \
172                 }                               \
173         }while(0)
174
175 #define signal_map(__map)                       \
176         do {                                    \
177                 if (__map->waiters) {           \
178                         ta += 1;                \
179                         XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
180                         ta: %u",  __map, __map->volume, __map->waiters, ta); \
181                         __map->waiters--;       \
182                         st_cond_signal(__map->cond);    \
183                 }                               \
184         }while(0)
185
186 #define signal_mapnode(__mn)                    \
187         do {                                    \
188                 if (__mn->waiters) {            \
189                         ta += __mn->waiters;    \
190                         XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
191                         %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
192                         __mn->waiters = 0;      \
193                         st_cond_broadcast(__mn->cond);  \
194                 }                               \
195         }while(0)
196
197
198 /* map flags */
199 #define MF_MAP_LOADING          (1 << 0)
200 #define MF_MAP_DESTROYED        (1 << 1)
201 #define MF_MAP_WRITING          (1 << 2)
202 #define MF_MAP_DELETING         (1 << 3)
203 #define MF_MAP_DROPPING_CACHE   (1 << 4)
204 #define MF_MAP_EXCLUSIVE        (1 << 5)
205 #define MF_MAP_OPENING          (1 << 6)
206 #define MF_MAP_CLOSING          (1 << 7)
207 #define MF_MAP_DELETED          (1 << 8)
208 #define MF_MAP_SNAPSHOTTING     (1 << 9)
209
210 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
211                                 MF_MAP_DROPPING_CACHE|MF_MAP_OPENING|          \
212                                 MF_MAP_SNAPSHOTTING)
213
214 struct map {
215         uint32_t version;
216         uint32_t flags;
217         uint64_t size;
218         uint32_t volumelen;
219         char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
220         xhash_t *objects;       /* obj_index --> map_node */
221         uint32_t ref;
222         uint32_t waiters;
223         st_cond_t cond;
224 };
225
226 struct mapperd {
227         xport bportno;          /* blocker that accesses data */
228         xport mbportno;         /* blocker that accesses maps */
229         xhash_t *hashmaps; // hash_function(target) --> struct map
230 };
231
232 struct mapper_io {
233         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
234         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
235         struct map_node *copyup_node;
236         volatile int err;                       /* error flag */
237         volatile uint64_t del_pending;
238         volatile uint64_t snap_pending;
239         uint64_t delobj;
240         uint64_t dcobj;
241         cb_t cb;
242         enum mapper_state state;
243         volatile int active;
244 };
245
246 /* global vars */
247 struct mapperd *mapper;
248
249 void print_map(struct map *m);
250
251
252 void custom_peer_usage()
253 {
254         fprintf(stderr, "Custom peer options: \n"
255                         "-bp  : port for block blocker(!)\n"
256                         "-mbp : port for map blocker\n"
257                         "\n");
258 }
259
260
261 /*
262  * Helper functions
263  */
264
265 static inline struct mapperd * __get_mapperd(struct peerd *peer)
266 {
267         return (struct mapperd *) peer->priv;
268 }
269
270 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
271 {
272         return (struct mapper_io *) pr->priv;
273 }
274
275 static inline uint64_t calc_map_obj(struct map *map)
276 {
277         if (map->size == -1)
278                 return 0;
279         uint64_t nr_objs = map->size / block_size;
280         if (map->size % block_size)
281                 nr_objs++;
282         return nr_objs;
283 }
284
285 static uint32_t calc_nr_obj(struct xseg_request *req)
286 {
287         unsigned int r = 1;
288         uint64_t rem_size = req->size;
289         uint64_t obj_offset = req->offset & (block_size -1); //modulo
290         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
291         rem_size -= obj_size;
292         while (rem_size > 0) {
293                 obj_size = (rem_size > block_size) ? block_size : rem_size;
294                 rem_size -= obj_size;
295                 r++;
296         }
297
298         return r;
299 }
300
301 /* hexlify function. 
302  * Unsafe. Doesn't check if data length is odd!
303  */
304 static void hexlify(unsigned char *data, char *hex)
305 {
306         int i;
307         for (i=0; i<SHA256_DIGEST_LENGTH; i++)
308                 sprintf(hex+2*i, "%02x", data[i]);
309 }
310
311 static void unhexlify(char *hex, unsigned char *data)
312 {
313         int i;
314         char c;
315         for (i=0; i<SHA256_DIGEST_LENGTH; i++){
316                 data[i] = 0;
317                 c = hex[2*i];
318                 if (isxdigit(c)){
319                         if (isdigit(c)){
320                                 c-= '0';
321                         }
322                         else {
323                                 c = tolower(c);
324                                 c = c-'a' + 10;
325                         }
326                 }
327                 else {
328                         c = 0;
329                 }
330                 data[i] |= (c << 4) & 0xF0;
331                 c = hex[2*i+1];
332                 if (isxdigit(c)){
333                         if (isdigit(c)){
334                                 c-= '0';
335                         }
336                         else {
337                                 c = tolower(c);
338                                 c = c-'a' + 10;
339                         }
340                 }
341                 else {
342                         c = 0;
343                 }
344                 data[i] |= c & 0x0F;
345         }
346 }
347 /*
348  * Maps handling functions
349  */
350
351 static struct map * find_map(struct mapperd *mapper, char *volume)
352 {
353         struct map *m = NULL;
354         int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
355                                 (xhashidx *) &m);
356         if (r < 0)
357                 return NULL;
358         return m;
359 }
360
361 static struct map * find_map_len(struct mapperd *mapper, char *target,
362                                         uint32_t targetlen, uint32_t flags)
363 {
364         char buf[XSEG_MAX_TARGETLEN+1];
365         if (flags & MF_ARCHIP){
366                 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
367                 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
368                 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
369                 targetlen += MAPPER_PREFIX_LEN;
370         }
371         else {
372                 strncpy(buf, target, targetlen);
373                 buf[targetlen] = 0;
374         }
375
376         if (targetlen > MAX_VOLUME_LEN){
377                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
378                                         targetlen, MAX_VOLUME_LEN);
379                 return NULL;
380         }
381
382         XSEGLOG2(&lc, D, "looking up map %s, len %u",
383                         buf, targetlen);
384         return find_map(mapper, buf);
385 }
386
387
388 static int insert_map(struct mapperd *mapper, struct map *map)
389 {
390         int r = -1;
391
392         if (find_map(mapper, map->volume)){
393                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
394                 goto out;
395         }
396
397         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
398                         map->volume, strlen(map->volume), (unsigned long) map);
399         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
400         while (r == -XHASH_ERESIZE) {
401                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
402                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
403                 if (!new_hashmap){
404                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
405                                         (unsigned long long) shift);
406                         goto out;
407                 }
408                 mapper->hashmaps = new_hashmap;
409                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
410         }
411 out:
412         return r;
413 }
414
415 static int remove_map(struct mapperd *mapper, struct map *map)
416 {
417         int r = -1;
418
419         //assert no pending pr on map
420
421         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
422         while (r == -XHASH_ERESIZE) {
423                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
424                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
425                 if (!new_hashmap){
426                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
427                                         (unsigned long long) shift);
428                         goto out;
429                 }
430                 mapper->hashmaps = new_hashmap;
431                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
432         }
433 out:
434         return r;
435 }
436
437 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
438 {
439         int r;
440         xport p;
441         struct peerd *peer = pr->peer;
442         struct xseg_request *req;
443         struct mapperd *mapper = __get_mapperd(peer);
444         void *dummy;
445
446         XSEGLOG2(&lc, I, "Closing map %s", map->volume);
447
448         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
449         if (!req){
450                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
451                                 map->volume);
452                 goto out_err;
453         }
454
455         r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
456         if (r < 0){
457                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
458                                 map->volume);
459                 goto out_put;
460         }
461
462         char *reqtarget = xseg_get_target(peer->xseg, req);
463         if (!reqtarget)
464                 goto out_put;
465         strncpy(reqtarget, map->volume, req->targetlen);
466         req->op = X_RELEASE;
467         req->size = 0;
468         req->offset = 0;
469         r = xseg_set_req_data(peer->xseg, req, pr);
470         if (r < 0){
471                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
472                                 map->volume);
473                 goto out_put;
474         }
475         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
476         if (p == NoPort){
477                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
478                                 map->volume);
479                 goto out_unset;
480         }
481         r = xseg_signal(peer->xseg, p);
482         map->flags |= MF_MAP_CLOSING;
483
484         XSEGLOG2(&lc, I, "Map %s closing", map->volume);
485         return req;
486
487 out_unset:
488         xseg_get_req_data(peer->xseg, req, &dummy);
489 out_put:
490         xseg_put_request(peer->xseg, req, pr->portno);
491 out_err:
492         return NULL;
493 }
494
495 static int close_map(struct peer_req *pr, struct map *map)
496 {
497         int err;
498         struct xseg_request *req;
499         struct peerd *peer = pr->peer;
500
501         req = __close_map(pr, map);
502         if (!req)
503                 return -1;
504         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
505         map->flags &= ~MF_MAP_CLOSING;
506         err = req->state & XS_FAILED;
507         xseg_put_request(peer->xseg, req, pr->portno);
508         if (err)
509                 return -1;
510         return 0;
511 }
512
513 /*
514 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
515                                 char *target, uint32_t targetlen, struct map **m)
516 {
517         struct mapperd *mapper = __get_mapperd(peer);
518         int r;
519         *m = find_map(mapper, target, targetlen);
520         if (*m) {
521                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
522                 if ((*m)->flags & MF_MAP_NOT_READY) {
523                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
524                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
525                         return MF_PENDING;
526                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
527                 //      return -1;
528                 // 
529                 }else {
530                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
531                         return 0;
532                 }
533         }
534         r = open_map(peer, pr, target, targetlen, 0);
535         if (r < 0)
536                 return -1; //error
537         return MF_PENDING;      
538 }
539 */
540 /*
541  * Object handling functions
542  */
543
544 struct map_node *find_object(struct map *map, uint64_t obj_index)
545 {
546         struct map_node *mn;
547         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
548         if (r < 0)
549                 return NULL;
550         return mn;
551 }
552
553 static int insert_object(struct map *map, struct map_node *mn)
554 {
555         //FIXME no find object first
556         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
557         if (r == -XHASH_ERESIZE) {
558                 unsigned long shift = xhash_grow_size_shift(map->objects);
559                 map->objects = xhash_resize(map->objects, shift, NULL);
560                 if (!map->objects)
561                         return -1;
562                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
563         }
564         return r;
565 }
566
567
568 /*
569  * map read/write functions
570  *
571  * version 0 -> pithos map
572  * version 1 -> archipelago version 1
573  *
574  *
575  * functions
576  *      int read_object(struct map_node *mn, unsigned char *buf)
577  *      int prepare_write_object(struct peer_req *pr, struct map *map,
578  *                              struct map_node *mn, struct xseg_request *req)
579  *      int read_map(struct map *m, unsigned char * data)
580  *      int prepare_write_map(struct peer_req *pr, struct map *map,
581  *                                      struct xseg_request *req)
582  */
583
584 struct map_functions {
585         int (*read_object)(struct map_node *mn, unsigned char *buf);
586         int (*prepare_write_object)(struct peer_req *pr, struct map *map,
587                                 struct map_node *mn, struct xseg_request *req);
588         int (*read_map)(struct map *m, unsigned char * data);
589         int (*prepare_write_map)(struct peer_req *pr, struct map *map,
590                                         struct xseg_request *req);
591 };
592
593 /* version 0 functions */
594
595 /* no header */
596 #define v0_mapheader_size 0
597 /* just the unhexlified name */
598 #define v0_objectsize_in_map SHA256_DIGEST_SIZE
599
600 static inline int read_object_v0(struct map_node *mn, unsigned char *buf)
601 {
602         hexlify(buf, mn->object);
603         mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
604         mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
605         mn->flags = MF_OBJECT_EXIST;
606
607         return 0;
608 }
609
610 static void v0_object_to_map(struct map_node *mn, unsigned char *data)
611 {
612         unhexlify(mn->object, data);
613 }
614
615 static int prepare_write_object_v0(struct peer_req *pr, struct map *map,
616                         struct map_node *mn, struct xseg_request *req)
617 {
618         struct peerd *peer = pr->peer;
619         int r = xseg_prep_request(peer->xseg, req, map->volumelen, v0_objectsize_in_map);
620         if (r < 0){
621                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
622                                 "(Map: %s [%llu]",
623                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
624                 return -1;
625         }
626         char *target = xseg_get_target(peer->xseg, req);
627         strncpy(target, map->volume, req->targetlen);
628         req->size = req->datalen;
629         req->offset = v0_mapheader_size + mn->objectidx * v0_objectsize_in_map;
630
631         unsigned char *data = xseg_get_data(pr->peer->xseg, req);
632         v0_object_to_map(mn, data);
633         return -1;
634 }
635
636 static int read_map_v0(struct map *m, unsigned char * data)
637 {
638         int r;
639         struct map_node *map_node;
640         uint64_t i;
641         uint64_t pos = 0;
642         uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
643         XSEGLOG2(&lc, D, "Max nr_objs %llu", max_nr_objs);
644         char nulls[SHA256_DIGEST_SIZE];
645         memset(nulls, 0, SHA256_DIGEST_SIZE);
646         map_node = calloc(max_nr_objs, sizeof(struct map_node));
647         if (!map_node)
648                 return -1;
649         for (i = 0; i < max_nr_objs; i++) {
650                 if (!memcmp(data+pos, nulls, v0_objectsize_in_map))
651                         break;
652                 map_node[i].objectidx = i;
653                 map_node[i].map = m;
654                 map_node[i].waiters = 0;
655                 map_node[i].ref = 1;
656                 map_node[i].cond = st_cond_new(); //FIXME err check;
657                 read_object_v0(&map_node[i], data+pos);
658                 pos += v0_objectsize_in_map;
659                 r = insert_object(m, &map_node[i]); //FIXME error check
660         }
661         XSEGLOG2(&lc, D, "Found %llu objects", i);
662         m->size = i * block_size;
663         return 0;
664 }
665
666 static int prepare_write_map_v0(struct peer_req *pr, struct map *map,
667                                 struct xseg_request *req)
668 {
669         struct peerd *peer = pr->peer;
670         uint64_t i, pos = 0, max_objidx = calc_map_obj(map);
671         struct map_node *mn;
672         int r = xseg_prep_request(peer->xseg, req, map->volumelen,
673                         v0_mapheader_size + max_objidx * v0_objectsize_in_map);
674         if (r < 0){
675                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
676                 return -1;
677         }
678         char *target = xseg_get_target(peer->xseg, req);
679         strncpy(target, map->volume, req->targetlen);
680         char *data = xseg_get_data(peer->xseg, req);
681
682         req->op = X_WRITE;
683         req->size = req->datalen;
684         req->offset = 0;
685
686         for (i = 0; i < max_objidx; i++) {
687                 mn = find_object(map, i);
688                 if (!mn){
689                         XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
690                                         (unsigned long long) i, map->volume);
691                         return -1;
692                 }
693                 v0_object_to_map(mn, (unsigned char *)(data+pos));
694                 pos += v0_objectsize_in_map;
695         }
696         XSEGLOG2(&lc, D, "Prepared %llu objects", i);
697         return 0;
698 }
699
700 /* static struct map_functions map_functions_v0 =       { */
701 /*                      .read_object = read_object_v0, */
702 /*                      .read_map = read_map_v0, */
703 /*                      .prepare_write_object = prepare_write_object_v0, */
704 /*                      .prepare_write_map = prepare_write_map_v0 */
705 /* }; */
706 #define map_functions_v0 {                              \
707                         .read_object = read_object_v0,  \
708                         .read_map = read_map_v0,        \
709                         .prepare_write_object = prepare_write_object_v0,\
710                         .prepare_write_map = prepare_write_map_v0       \
711                         }
712 /* v1 functions */
713
714 /* transparency byte + max object len in disk */
715 #define v1_objectsize_in_map (1 + SHA256_DIGEST_SIZE)
716
717 /* Map header contains:
718  *      map version
719  *      volume size
720  */
721 #define v1_mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
722
723 static inline int read_object_v1(struct map_node *mn, unsigned char *buf)
724 {
725         char c = buf[0];
726         mn->flags = 0;
727         if (c){
728                 mn->flags |= MF_OBJECT_EXIST;
729                 strcpy(mn->object, MAPPER_PREFIX);
730                 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
731                 mn->object[MAX_OBJECT_LEN] = 0;
732                 mn->objectlen = strlen(mn->object);
733         }
734         else {
735                 mn->flags &= ~MF_OBJECT_EXIST;
736                 hexlify(buf+1, mn->object);
737                 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
738                 mn->objectlen = strlen(mn->object);
739         }
740         return 0;
741 }
742
743 static inline void v1_object_to_map(char* buf, struct map_node *mn)
744 {
745         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
746         if (buf[0]){
747                 /* strip common prefix */
748                 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
749         }
750         else {
751                 unhexlify(mn->object, (unsigned char *)(buf+1));
752         }
753 }
754
755 static int prepare_write_object_v1(struct peer_req *pr, struct map *map,
756                                 struct map_node *mn, struct xseg_request *req)
757 {
758         struct peerd *peer = pr->peer;
759         int r = xseg_prep_request(peer->xseg, req, map->volumelen, v1_objectsize_in_map);
760         if (r < 0){
761                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
762                                 "(Map: %s [%llu]",
763                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
764                 return -1;
765         }
766         char *target = xseg_get_target(peer->xseg, req);
767         strncpy(target, map->volume, req->targetlen);
768         req->size = req->datalen;
769         req->offset = v1_mapheader_size + mn->objectidx * v1_objectsize_in_map;
770
771         char *data = xseg_get_data(pr->peer->xseg, req);
772         v1_object_to_map(data, mn);
773         return 0;
774 }
775
776 static int read_map_v1(struct map *m, unsigned char * data)
777 {
778         int r;
779         struct map_node *map_node;
780         uint64_t i;
781         uint64_t pos = 0;
782         uint64_t nr_objs;
783
784         /* read header */
785         m->version = *(uint32_t *) (data + pos);
786         pos += sizeof(uint32_t);
787         m->size = *(uint64_t *) (data + pos);
788         pos += sizeof(uint64_t);
789
790         /* read objects */
791         nr_objs = m->size / block_size;
792         if (m->size % block_size)
793                 nr_objs++;
794         map_node = calloc(nr_objs, sizeof(struct map_node));
795         if (!map_node)
796                 return -1;
797
798         for (i = 0; i < nr_objs; i++) {
799                 map_node[i].map = m;
800                 map_node[i].objectidx = i;
801                 map_node[i].waiters = 0;
802                 map_node[i].ref = 1;
803                 map_node[i].cond = st_cond_new(); //FIXME err check;
804                 read_object_v1(&map_node[i], data+pos);
805                 pos += objectsize_in_map;
806                 r = insert_object(m, &map_node[i]); //FIXME error check
807         }
808         return 0;
809 }
810
811 static int prepare_write_map_v1(struct peer_req *pr, struct map *m,
812                                 struct xseg_request *req)
813 {
814         struct peerd *peer = pr->peer;
815         uint64_t i, pos = 0, max_objidx = calc_map_obj(m);
816         struct map_node *mn;
817
818         int r = xseg_prep_request(peer->xseg, req, m->volumelen,
819                         v1_mapheader_size + max_objidx * v1_objectsize_in_map);
820         if (r < 0){
821                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", m->volume);
822                 return -1;
823         }
824         char *target = xseg_get_target(peer->xseg, req);
825         strncpy(target, m->volume, req->targetlen);
826         char *data = xseg_get_data(peer->xseg, req);
827
828         memcpy(data + pos, &m->version, sizeof(m->version));
829         pos += sizeof(m->version);
830         memcpy(data + pos, &m->size, sizeof(m->size));
831         pos += sizeof(m->size);
832
833         req->op = X_WRITE;
834         req->size = req->datalen;
835         req->offset = 0;
836
837         for (i = 0; i < max_objidx; i++) {
838                 mn = find_object(m, i);
839                 if (!mn){
840                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
841                                         (unsigned long long) i, m->volume);
842                         return -1;
843                 }
844                 v1_object_to_map(data+pos, mn);
845                 pos += v1_objectsize_in_map;
846         }
847         return 0;
848 }
849
850 /* static struct map_functions map_functions_v1 =       { */
851 /*                      .read_object = read_object_v1, */
852 /*                      .read_map = read_map_v1, */
853 /*                      .prepare_write_object = prepare_write_object_v1, */
854 /*                      .prepare_write_map = prepare_write_map_v1 */
855 /* }; */
856 #define map_functions_v1 {                              \
857                         .read_object = read_object_v1,  \
858                         .read_map = read_map_v1,        \
859                         .prepare_write_object = prepare_write_object_v1,\
860                         .prepare_write_map = prepare_write_map_v1       \
861                         }
862
863 static struct map_functions map_functions[] = { map_functions_v0,
864                                                 map_functions_v1 };
865 #define MAP_LATEST_VERSION 1
866 /* end of functions */
867
868
869
870
871
872 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
873 {
874         hexlify(buf, mn->object);
875         mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
876         mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
877         mn->flags = MF_OBJECT_EXIST;
878 }
879
880 static inline void map_to_object(struct map_node *mn, unsigned char *buf)
881 {
882         char c = buf[0];
883         mn->flags = 0;
884         if (c){
885                 mn->flags |= MF_OBJECT_EXIST;
886                 strcpy(mn->object, MAPPER_PREFIX);
887                 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
888                 mn->object[MAX_OBJECT_LEN] = 0;
889                 mn->objectlen = strlen(mn->object);
890         }
891         else {
892                 hexlify(buf+1, mn->object);
893                 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
894                 mn->objectlen = strlen(mn->object);
895         }
896
897 }
898
899 static inline void object_to_map(char* buf, struct map_node *mn)
900 {
901         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
902         if (buf[0]){
903                 /* strip common prefix */
904                 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
905         }
906         else {
907                 unhexlify(mn->object, (unsigned char *)(buf+1));
908         }
909 }
910
911 static inline void mapheader_to_map(struct map *m, char *buf)
912 {
913         uint64_t pos = 0;
914         memcpy(buf + pos, &m->version, sizeof(m->version));
915         pos += sizeof(m->version);
916         memcpy(buf + pos, &m->size, sizeof(m->size));
917         pos += sizeof(m->size);
918 }
919
920
921 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
922                                 struct map *map, struct map_node *mn)
923 {
924         int r;
925         void *dummy;
926         struct mapperd *mapper = __get_mapperd(peer);
927         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
928                                                         mapper->mbportno, X_ALLOC);
929         if (!req){
930                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
931                                 "(Map: %s [%llu]",
932                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
933                 goto out_err;
934         }
935
936         r = map_functions[map->version].prepare_write_object(pr, map, mn, req);
937         if (r < 0){
938                 XSEGLOG2(&lc, E, "Cannot prepare write object");
939                 goto out_put;
940         }
941         req->op = X_WRITE;
942
943         r = xseg_set_req_data(peer->xseg, req, pr);
944         if (r < 0){
945                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
946                                 "(Map: %s [%llu]",
947                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
948                 goto out_put;
949         }
950         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
951         if (p == NoPort){
952                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
953                                 "(Map: %s [%llu]",
954                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
955                 goto out_unset;
956         }
957         r = xseg_signal(peer->xseg, p);
958         if (r < 0)
959                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
960
961         XSEGLOG2(&lc, I, "Writing object %s \n\t"
962                         "Map: %s [%llu]",
963                         mn->object, map->volume, (unsigned long long) mn->objectidx);
964
965         return req;
966
967 out_unset:
968         xseg_get_req_data(peer->xseg, req, &dummy);
969 out_put:
970         xseg_put_request(peer->xseg, req, pr->portno);
971 out_err:
972         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
973                         "(Map: %s [%llu]",
974                         mn->object, map->volume, (unsigned long long) mn->objectidx);
975         return NULL;
976 }
977
978 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
979 {
980         int r;
981         void *dummy;
982         struct peerd *peer = pr->peer;
983         struct mapperd *mapper = __get_mapperd(peer);
984         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
985                                                         mapper->mbportno, X_ALLOC);
986         if (!req){
987                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
988                 goto out_err;
989         }
990
991         r = map_functions[map->version].prepare_write_map(pr, map, req);
992         if (r < 0){
993                 XSEGLOG2(&lc, E, "Cannot prepare write map");
994                 goto out_put;
995         }
996
997         req->op = X_WRITE;
998
999         r = xseg_set_req_data(peer->xseg, req, pr);
1000         if (r < 0){
1001                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1002                                 map->volume);
1003                 goto out_put;
1004         }
1005         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1006         if (p == NoPort){
1007                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1008                                 map->volume);
1009                 goto out_unset;
1010         }
1011         r = xseg_signal(peer->xseg, p);
1012         if (r < 0)
1013                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
1014
1015         map->flags |= MF_MAP_WRITING;
1016         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
1017         return req;
1018
1019 out_unset:
1020         xseg_get_req_data(peer->xseg, req, &dummy);
1021 out_put:
1022         xseg_put_request(peer->xseg, req, pr->portno);
1023 out_err:
1024         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
1025         return NULL;
1026 }
1027
1028 static int write_map(struct peer_req* pr, struct map *map)
1029 {
1030         int r = 0;
1031         struct peerd *peer = pr->peer;
1032         struct xseg_request *req = __write_map(pr, map);
1033         if (!req)
1034                 return -1;
1035         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1036         if (req->state & XS_FAILED)
1037                 r = -1;
1038         xseg_put_request(peer->xseg, req, pr->portno);
1039         map->flags &= ~MF_MAP_WRITING;
1040         return r;
1041 }
1042
1043 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
1044 {
1045         int r;
1046         xport p;
1047         struct xseg_request *req;
1048         struct peerd *peer = pr->peer;
1049         struct mapperd *mapper = __get_mapperd(peer);
1050         void *dummy;
1051
1052         XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
1053
1054         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1055         if (!req){
1056                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1057                                 m->volume);
1058                 goto out_fail;
1059         }
1060
1061         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1062         if (r < 0){
1063                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1064                                 m->volume);
1065                 goto out_put;
1066         }
1067
1068         char *reqtarget = xseg_get_target(peer->xseg, req);
1069         if (!reqtarget)
1070                 goto out_put;
1071         strncpy(reqtarget, m->volume, req->targetlen);
1072         req->op = X_READ;
1073         req->size = block_size;
1074         req->offset = 0;
1075         r = xseg_set_req_data(peer->xseg, req, pr);
1076         if (r < 0){
1077                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1078                                 m->volume);
1079                 goto out_put;
1080         }
1081         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1082         if (p == NoPort){
1083                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1084                                 m->volume);
1085                 goto out_unset;
1086         }
1087         r = xseg_signal(peer->xseg, p);
1088
1089         m->flags |= MF_MAP_LOADING;
1090         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
1091         return req;
1092
1093 out_unset:
1094         xseg_get_req_data(peer->xseg, req, &dummy);
1095 out_put:
1096         xseg_put_request(peer->xseg, req, pr->portno);
1097 out_fail:
1098         return NULL;
1099 }
1100
1101 static int read_map (struct map *map, unsigned char *buf)
1102 {
1103         char nulls[SHA256_DIGEST_SIZE];
1104         memset(nulls, 0, SHA256_DIGEST_SIZE);
1105
1106         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
1107         if (r) {
1108                 XSEGLOG2(&lc, E, "Read zeros");
1109                 return -1;
1110         }
1111         //type 1, archip type, type 0 pithos map
1112         int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1113         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
1114         uint32_t version;
1115         if (type)
1116                 version = *(uint32_t *) (buf); //version should always be the first uint32_t
1117         else
1118                 version = 0;
1119         if (version > MAP_LATEST_VERSION){
1120                 XSEGLOG2(&lc, E, "Map read for map %s failed. Invalid version %u",
1121                                 map->volume, version);
1122                 return -1;
1123         }
1124
1125         r = map_functions[version].read_map(map, buf);
1126         if (r < 0){
1127                 XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
1128                 return -1;
1129         }
1130
1131         print_map(map);
1132         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
1133         return 0;
1134
1135 }
1136
1137 static int load_map(struct peer_req *pr, struct map *map)
1138 {
1139         int r = 0;
1140         struct xseg_request *req;
1141         struct peerd *peer = pr->peer;
1142         req = __load_map(pr, map);
1143         if (!req)
1144                 return -1;
1145         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1146         map->flags &= ~MF_MAP_LOADING;
1147         if (req->state & XS_FAILED){
1148                 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
1149                 xseg_put_request(peer->xseg, req, pr->portno);
1150                 return -1;
1151         }
1152         r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
1153         xseg_put_request(peer->xseg, req, pr->portno);
1154         return r;
1155 }
1156
1157 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
1158                                                 uint32_t flags)
1159 {
1160         int r;
1161         xport p;
1162         struct xseg_request *req;
1163         struct peerd *peer = pr->peer;
1164         struct mapperd *mapper = __get_mapperd(peer);
1165         void *dummy;
1166
1167         XSEGLOG2(&lc, I, "Opening map %s", m->volume);
1168
1169         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1170         if (!req){
1171                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1172                                 m->volume);
1173                 goto out_fail;
1174         }
1175
1176         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1177         if (r < 0){
1178                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1179                                 m->volume);
1180                 goto out_put;
1181         }
1182
1183         char *reqtarget = xseg_get_target(peer->xseg, req);
1184         if (!reqtarget)
1185                 goto out_put;
1186         strncpy(reqtarget, m->volume, req->targetlen);
1187         req->op = X_ACQUIRE;
1188         req->size = block_size;
1189         req->offset = 0;
1190         if (!(flags & MF_FORCE))
1191                 req->flags = XF_NOSYNC;
1192         r = xseg_set_req_data(peer->xseg, req, pr);
1193         if (r < 0){
1194                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1195                                 m->volume);
1196                 goto out_put;
1197         }
1198         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1199         if (p == NoPort){ 
1200                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1201                                 m->volume);
1202                 goto out_unset;
1203         }
1204         r = xseg_signal(peer->xseg, p);
1205
1206         m->flags |= MF_MAP_OPENING;
1207         XSEGLOG2(&lc, I, "Map %s opening", m->volume);
1208         return req;
1209
1210 out_unset:
1211         xseg_get_req_data(peer->xseg, req, &dummy);
1212 out_put:
1213         xseg_put_request(peer->xseg, req, pr->portno);
1214 out_fail:
1215         return NULL;
1216 }
1217
1218 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
1219 {
1220         int err;
1221         struct xseg_request *req;
1222         struct peerd *peer = pr->peer;
1223
1224         req = __open_map(pr, map, flags);
1225         if (!req){
1226                 return -1;
1227         }
1228         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1229         map->flags &= ~MF_MAP_OPENING;
1230         err = req->state & XS_FAILED;
1231         xseg_put_request(peer->xseg, req, pr->portno);
1232         if (err)
1233                 return -1;
1234         else
1235                 map->flags |= MF_MAP_EXCLUSIVE;
1236         return 0;
1237 }
1238
1239 /*
1240  * copy up functions
1241  */
1242
1243 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
1244 {
1245         int r = 0;
1246         if (mn){
1247                 XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
1248                                 req, mn, mio);
1249                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1250                 if (r == -XHASH_ERESIZE) {
1251                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
1252                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1253                         if (!new_hashmap)
1254                                 goto out;
1255                         mio->copyups_nodes = new_hashmap;
1256                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1257                 }
1258                 if (r < 0)
1259                         XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
1260                                         req, mn, mio);
1261         }
1262         else {
1263                 XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
1264                                 req, mio);
1265                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1266                 if (r == -XHASH_ERESIZE) {
1267                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
1268                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1269                         if (!new_hashmap)
1270                                 goto out;
1271                         mio->copyups_nodes = new_hashmap;
1272                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1273                 }
1274                 if (r < 0)
1275                         XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
1276                                         req, mio);
1277         }
1278 out:
1279         return r;
1280 }
1281
1282 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
1283 {
1284         struct map_node *mn;
1285         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
1286         if (r < 0){
1287                 XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
1288                 return NULL;
1289         }
1290         XSEGLOG2(&lc, D, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
1291         return mn;
1292 }
1293
1294 static struct xseg_request * __snapshot_object(struct peer_req *pr,
1295                                                 struct map_node *mn)
1296 {
1297         struct peerd *peer = pr->peer;
1298         struct mapperd *mapper = __get_mapperd(peer);
1299         struct mapper_io *mio = __get_mapper_io(pr);
1300         //struct map *map = mn->map;
1301         void *dummy;
1302         int r = -1;
1303         xport p;
1304
1305         //assert mn->volume != zero_block
1306         //assert mn->flags & MF_OBJECT_EXIST
1307         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1308                                                 mapper->bportno, X_ALLOC);
1309         if (!req){
1310                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1311                 goto out_err;
1312         }
1313         r = xseg_prep_request(peer->xseg, req, mn->objectlen,
1314                                 sizeof(struct xseg_request_snapshot));
1315         if (r < 0){
1316                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1317                 goto out_put;
1318         }
1319
1320         char *target = xseg_get_target(peer->xseg, req);
1321         strncpy(target, mn->object, req->targetlen);
1322
1323         struct xseg_request_snapshot *xsnapshot = (struct xseg_request_snapshot *) xseg_get_data(peer->xseg, req);
1324         xsnapshot->target[0] = 0;
1325         xsnapshot->targetlen = 0;
1326
1327         req->offset = 0;
1328         req->size = block_size;
1329         req->op = X_SNAPSHOT;
1330         r = xseg_set_req_data(peer->xseg, req, pr);
1331         if (r<0){
1332                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1333                 goto out_put;
1334         }
1335         r = __set_copyup_node(mio, req, mn);
1336         if (r < 0)
1337                 goto out_unset;
1338         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1339         if (p == NoPort) {
1340                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1341                 goto out_mapper_unset;
1342         }
1343         xseg_signal(peer->xseg, p);
1344
1345         mn->flags |= MF_OBJECT_SNAPSHOTTING;
1346         XSEGLOG2(&lc, I, "Snapshotting up object %s", mn->object);
1347         return req;
1348
1349 out_mapper_unset:
1350         __set_copyup_node(mio, req, NULL);
1351 out_unset:
1352         xseg_get_req_data(peer->xseg, req, &dummy);
1353 out_put:
1354         xseg_put_request(peer->xseg, req, pr->portno);
1355 out_err:
1356         XSEGLOG2(&lc, E, "Snapshotting object %s failed", mn->object);
1357         return NULL;
1358 }
1359
1360 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
1361 {
1362         struct mapperd *mapper = __get_mapperd(peer);
1363         struct mapper_io *mio = __get_mapper_io(pr);
1364         struct map *map = mn->map;
1365         void *dummy;
1366         int r = -1;
1367         xport p;
1368
1369         uint32_t newtargetlen;
1370         char new_target[MAX_OBJECT_LEN + 1];
1371         unsigned char sha[SHA256_DIGEST_SIZE];
1372
1373         strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1374
1375         char tmp[XSEG_MAX_TARGETLEN + 1];
1376         uint32_t tmplen;
1377         strncpy(tmp, map->volume, map->volumelen);
1378         sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1379         tmp[XSEG_MAX_TARGETLEN] = 0;
1380         tmplen = strlen(tmp);
1381         SHA256((unsigned char *)tmp, tmplen, sha);
1382         hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1383         newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1384
1385
1386         if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1387                 goto copyup_zeroblock;
1388
1389         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1390                                                 mapper->bportno, X_ALLOC);
1391         if (!req){
1392                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1393                 goto out_err;
1394         }
1395         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
1396                                 sizeof(struct xseg_request_copy));
1397         if (r < 0){
1398                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1399                 goto out_put;
1400         }
1401
1402         char *target = xseg_get_target(peer->xseg, req);
1403         strncpy(target, new_target, req->targetlen);
1404
1405         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1406         strncpy(xcopy->target, mn->object, mn->objectlen);
1407         xcopy->targetlen = mn->objectlen;
1408
1409         req->offset = 0;
1410         req->size = block_size;
1411         req->op = X_COPY;
1412         r = xseg_set_req_data(peer->xseg, req, pr);
1413         if (r<0){
1414                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1415                 goto out_put;
1416         }
1417         r = __set_copyup_node(mio, req, mn);
1418         if (r < 0)
1419                 goto out_unset;
1420         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1421         if (p == NoPort) {
1422                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1423                 goto out_mapper_unset;
1424         }
1425         xseg_signal(peer->xseg, p);
1426 //      mio->copyups++;
1427
1428         mn->flags |= MF_OBJECT_COPYING;
1429         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1430         return req;
1431
1432 out_mapper_unset:
1433         __set_copyup_node(mio, req, NULL);
1434 out_unset:
1435         xseg_get_req_data(peer->xseg, req, &dummy);
1436 out_put:
1437         xseg_put_request(peer->xseg, req, pr->portno);
1438 out_err:
1439         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1440         return NULL;
1441
1442 copyup_zeroblock:
1443         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1444                         "Proceeding in writing the new object in map");
1445         /* construct a tmp map_node for writing purposes */
1446         struct map_node newmn = *mn;
1447         newmn.flags = MF_OBJECT_EXIST;
1448         strncpy(newmn.object, new_target, newtargetlen);
1449         newmn.object[newtargetlen] = 0;
1450         newmn.objectlen = newtargetlen;
1451         newmn.objectidx = mn->objectidx; 
1452         req = object_write(peer, pr, map, &newmn);
1453         r = __set_copyup_node(mio, req, mn);
1454         if (r < 0)
1455                 return NULL;
1456         if (!req){
1457                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1458                                 "\n\t of map %s [%llu]",
1459                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
1460                 return NULL;
1461         }
1462         mn->flags |= MF_OBJECT_WRITING;
1463         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1464         return req;
1465 }
1466
1467 static struct xseg_request * __delete_object(struct peer_req *pr, struct map_node *mn)
1468 {
1469         void *dummy;
1470         struct peerd *peer = pr->peer;
1471         struct mapperd *mapper = __get_mapperd(peer);
1472         struct mapper_io *mio = __get_mapper_io(pr);
1473         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1474                                                         mapper->bportno, X_ALLOC);
1475         XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1476         if (!req){
1477                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1478                 goto out_err;
1479         }
1480         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1481         if (r < 0){
1482                 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1483                 goto out_put;
1484         }
1485         char *target = xseg_get_target(peer->xseg, req);
1486         strncpy(target, mn->object, req->targetlen);
1487         req->op = X_DELETE;
1488         req->size = req->datalen;
1489         req->offset = 0;
1490         r = xseg_set_req_data(peer->xseg, req, pr);
1491         if (r < 0){
1492                 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1493                 goto out_put;
1494         }
1495         r = __set_copyup_node(mio, req, mn);
1496         if (r < 0)
1497                 goto out_unset;
1498         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1499         if (p == NoPort){
1500                 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1501                 goto out_mapper_unset;
1502         }
1503         r = xseg_signal(peer->xseg, p);
1504         mn->flags |= MF_OBJECT_DELETING;
1505         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1506         return req;
1507
1508 out_mapper_unset:
1509         __set_copyup_node(mio, req, NULL);
1510 out_unset:
1511         xseg_get_req_data(peer->xseg, req, &dummy);
1512 out_put:
1513         xseg_put_request(peer->xseg, req, pr->portno);
1514 out_err:
1515         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1516         return NULL;
1517 }
1518
1519 static struct xseg_request * __delete_map(struct peer_req *pr, struct map *map)
1520 {
1521         void *dummy;
1522         struct peerd *peer = pr->peer;
1523         struct mapperd *mapper = __get_mapperd(peer);
1524         struct mapper_io *mio = __get_mapper_io(pr);
1525         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1526                                                         mapper->mbportno, X_ALLOC);
1527         XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1528         if (!req){
1529                 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1530                 goto out_err;
1531         }
1532         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1533         if (r < 0){
1534                 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1535                 goto out_put;
1536         }
1537         char *target = xseg_get_target(peer->xseg, req);
1538         strncpy(target, map->volume, req->targetlen);
1539         req->op = X_DELETE;
1540         req->size = req->datalen;
1541         req->offset = 0;
1542         r = xseg_set_req_data(peer->xseg, req, pr);
1543         if (r < 0){
1544                 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1545                 goto out_put;
1546         }
1547         /* do not check return value. just make sure there is no node set */
1548         __set_copyup_node(mio, req, NULL);
1549         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1550         if (p == NoPort){
1551                 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1552                 goto out_unset;
1553         }
1554         r = xseg_signal(peer->xseg, p);
1555         map->flags |= MF_MAP_DELETING;
1556         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1557         return req;
1558
1559 out_unset:
1560         xseg_get_req_data(peer->xseg, req, &dummy);
1561 out_put:
1562         xseg_put_request(peer->xseg, req, pr->portno);
1563 out_err:
1564         XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1565         return  NULL;
1566 }
1567
1568
1569 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1570 {
1571         struct map_node *mn = find_object(map, index);
1572         if (mn)
1573                 mn->ref++;
1574         return mn;
1575 }
1576
1577 static inline void put_mapnode(struct map_node *mn)
1578 {
1579         mn->ref--;
1580         if (!mn->ref){
1581                 //clean up mn
1582                 st_cond_destroy(mn->cond);
1583         }
1584 }
1585
1586 static inline void __get_map(struct map *map)
1587 {
1588         map->ref++;
1589 }
1590
1591 static inline void put_map(struct map *map)
1592 {
1593         struct map_node *mn;
1594         map->ref--;
1595         if (!map->ref){
1596                 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1597                 //clean up map
1598                 uint64_t i;
1599                 for (i = 0; i < calc_map_obj(map); i++) {
1600                         mn = get_mapnode(map, i);
1601                         if (mn) {
1602                                 //make sure all pending operations on all objects are completed
1603                                 //this should never happen...
1604                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1605                                 mn->flags |= MF_OBJECT_DESTROYED;
1606                                 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1607                                 put_mapnode(mn); //matcing get_mapnode;
1608                                 //assert mn->ref == 0;
1609                         }
1610                 }
1611                 mn = find_object(map, 0);
1612                 if (mn)
1613                         free(mn);
1614                 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1615                 free(map);
1616         }
1617 }
1618
1619 static struct map * create_map(struct mapperd *mapper, char *name,
1620                                 uint32_t namelen, uint32_t flags)
1621 {
1622         int r;
1623         if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1624                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1625                                         namelen, MAX_VOLUME_LEN);
1626                 return NULL;
1627         }
1628         struct map *m = malloc(sizeof(struct map));
1629         if (!m){
1630                 XSEGLOG2(&lc, E, "Cannot allocate map ");
1631                 goto out_err;
1632         }
1633         m->size = -1;
1634         if (flags & MF_ARCHIP){
1635                 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1636                 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1637                 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1638                 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1639                 m->version = 1; /* keep this hardcoded for now */
1640         }
1641         else {
1642                 strncpy(m->volume, name, namelen);
1643                 m->volume[namelen] = 0;
1644                 m->volumelen = namelen;
1645                 m->version = 0; /* version 0 should be pithos maps */
1646         }
1647         m->flags = 0;
1648         m->objects = xhash_new(3, INTEGER); 
1649         if (!m->objects){
1650                 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1651                                 m->volume);
1652                 goto out_map;
1653         }
1654         m->ref = 1;
1655         m->waiters = 0;
1656         m->cond = st_cond_new(); //FIXME err check;
1657         r = insert_map(mapper, m);
1658         if (r < 0){
1659                 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1660                 goto out_hash;
1661         }
1662
1663         return m;
1664
1665 out_hash:
1666         xhash_free(m->objects);
1667 out_map:
1668         XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1669         free(m);
1670 out_err:
1671         return NULL;
1672 }
1673
1674
1675
1676 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1677 {
1678         struct peerd *peer = pr->peer;
1679         struct mapperd *mapper = __get_mapperd(peer);
1680         (void)mapper;
1681         struct mapper_io *mio = __get_mapper_io(pr);
1682         struct map_node *mn = __get_copyup_node(mio, req);
1683
1684         __set_copyup_node(mio, req, NULL);
1685
1686         //assert req->op = X_DELETE;
1687         //assert pr->req->op = X_DELETE only map deletions make delete requests
1688         //assert mio->del_pending > 0
1689         XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
1690         mio->del_pending--;
1691
1692         if (req->state & XS_FAILED){
1693                 mio->err = 1;
1694         }
1695         if (mn){
1696                 XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
1697                                 mn, mn->object, mio, req);
1698                 // assert mn->flags & MF_OBJECT_DELETING
1699                 mn->flags &= ~MF_OBJECT_DELETING;
1700                 mn->flags |= MF_OBJECT_DESTROYED;
1701                 signal_mapnode(mn);
1702                 /* put mapnode here, matches get_mapnode on do_destroy */
1703                 put_mapnode(mn);
1704         } else {
1705                 XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
1706                                 mio, req);
1707         }
1708         xseg_put_request(peer->xseg, req, pr->portno);
1709         signal_pr(pr);
1710 }
1711
1712 void snapshot_cb(struct peer_req *pr, struct xseg_request *req)
1713 {
1714         struct peerd *peer = pr->peer;
1715         struct mapperd *mapper = __get_mapperd(peer);
1716         (void)mapper;
1717         struct mapper_io *mio = __get_mapper_io(pr);
1718         struct map_node *mn = __get_copyup_node(mio, req);
1719         if (!mn){
1720                 XSEGLOG2(&lc, E, "Cannot get map node");
1721                 goto out_err;
1722         }
1723         __set_copyup_node(mio, req, NULL);
1724
1725         if (req->state & XS_FAILED){
1726                 if (req->op == X_DELETE){
1727                         XSEGLOG2(&lc, E, "Delete req failed");
1728                         goto out_ok;
1729                 }
1730                 XSEGLOG2(&lc, E, "Req failed");
1731                 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1732                 mn->flags &= ~MF_OBJECT_WRITING;
1733                 goto out_err;
1734         }
1735
1736         if (req->op == X_WRITE) {
1737                 char old_object_name[MAX_OBJECT_LEN + 1];
1738                 uint32_t old_objectlen;
1739
1740                 char *target = xseg_get_target(peer->xseg, req);
1741                 (void)target;
1742                 //assert mn->flags & MF_OBJECT_WRITING
1743                 mn->flags &= ~MF_OBJECT_WRITING;
1744                 strncpy(old_object_name, mn->object, mn->objectlen);
1745                 old_objectlen = mn->objectlen;
1746
1747                 struct map_node tmp;
1748                 char *data = xseg_get_data(peer->xseg, req);
1749                 map_to_object(&tmp, (unsigned char *) data);
1750                 mn->flags &= ~MF_OBJECT_EXIST;
1751
1752                 strncpy(mn->object, tmp.object, tmp.objectlen);
1753                 mn->object[tmp.objectlen] = 0;
1754                 mn->objectlen = tmp.objectlen;
1755                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1756                 //signal_mapnode since Snapshot was successfull
1757                 signal_mapnode(mn);
1758                 /*
1759                 //do delete old object
1760                 strncpy(tmp.object, old_object_name, old_objectlen);
1761                 tmp.object[old_objectlen] = 0;
1762                 tmp.objectlen = old_objectlen;
1763                 tmp.flags = MF_OBJECT_EXIST;
1764                 struct xseg_request *xreq = __delete_object(pr, &tmp);
1765                 if (!xreq){
1766                         //just a warning. Snapshot was successfull
1767                         XSEGLOG2(&lc, W, "Cannot delete old object %s", tmp.object);
1768                         goto out_ok;
1769                 }
1770                 //overwrite copyup node, since tmp is a stack dummy variable
1771                 __set_copyup_node (mio, xreq, mn);
1772                 XSEGLOG2(&lc, I, "Deletion of %s pending", tmp.object);
1773                 */
1774                 goto out_ok;
1775         } else if (req->op == X_SNAPSHOT) {
1776                 //issue write_object;
1777                 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1778                 struct map *map = mn->map;
1779                 if (!map){
1780                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1781                         goto out_err;
1782                 }
1783
1784                 /* construct a tmp map_node for writing purposes */
1785                 //char *target = xseg_get_target(peer->xseg, req);
1786                 struct map_node newmn = *mn;
1787                 newmn.flags = 0;
1788                 struct xseg_reply_snapshot *xreply;
1789                 xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, req);
1790                 //assert xreply->targetlen !=0
1791                 //assert xreply->targetlen < XSEG_MAX_TARGETLEN
1792                 //xreply->target[xreply->targetlen] = 0;
1793                 //assert xreply->target valid
1794                 strncpy(newmn.object, xreply->target, xreply->targetlen);
1795                 newmn.object[req->targetlen] = 0;
1796                 newmn.objectlen = req->targetlen;
1797                 newmn.objectidx = mn->objectidx;
1798                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1799                 if (!xreq){
1800                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1801                                         "\n\t of map %s [%llu]",
1802                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1803                         goto out_err;
1804                 }
1805                 mn->flags |= MF_OBJECT_WRITING;
1806                 __set_copyup_node (mio, xreq, mn);
1807
1808                 XSEGLOG2(&lc, I, "Object %s snapshot completed. Pending writing.", mn->object);
1809         } else if (req->op == X_DELETE){
1810                 //deletion of the old block completed
1811                 XSEGLOG2(&lc, I, "Deletion of completed");
1812                 goto out_ok;
1813                 ;
1814         } else {
1815                 //wtf??
1816                 ;
1817         }
1818
1819 out:
1820         xseg_put_request(peer->xseg, req, pr->portno);
1821         return;
1822
1823 out_err:
1824         mio->snap_pending--;
1825         XSEGLOG2(&lc, D, "Mio->snap_pending: %u", mio->snap_pending);
1826         mio->err = 1;
1827         if (mn)
1828                 signal_mapnode(mn);
1829         signal_pr(pr);
1830         goto out;
1831
1832 out_ok:
1833         mio->snap_pending--;
1834         signal_pr(pr);
1835         goto out;
1836
1837
1838 }
1839 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1840 {
1841         struct peerd *peer = pr->peer;
1842         struct mapperd *mapper = __get_mapperd(peer);
1843         (void)mapper;
1844         struct mapper_io *mio = __get_mapper_io(pr);
1845         struct map_node *mn = __get_copyup_node(mio, req);
1846         if (!mn){
1847                 XSEGLOG2(&lc, E, "Cannot get map node");
1848                 goto out_err;
1849         }
1850         __set_copyup_node(mio, req, NULL);
1851
1852         if (req->state & XS_FAILED){
1853                 XSEGLOG2(&lc, E, "Req failed");
1854                 mn->flags &= ~MF_OBJECT_COPYING;
1855                 mn->flags &= ~MF_OBJECT_WRITING;
1856                 goto out_err;
1857         }
1858         if (req->op == X_WRITE) {
1859                 char *target = xseg_get_target(peer->xseg, req);
1860                 (void)target;
1861                 //printf("handle object write replyi\n");
1862                 __set_copyup_node(mio, req, NULL);
1863                 //assert mn->flags & MF_OBJECT_WRITING
1864                 mn->flags &= ~MF_OBJECT_WRITING;
1865
1866                 struct map_node tmp;
1867                 char *data = xseg_get_data(peer->xseg, req);
1868                 map_to_object(&tmp, (unsigned char *) data);
1869                 mn->flags |= MF_OBJECT_EXIST;
1870                 if (mn->flags != MF_OBJECT_EXIST){
1871                         XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1872                         goto out_err;
1873                 }
1874                 //assert mn->flags & MF_OBJECT_EXIST
1875                 strncpy(mn->object, tmp.object, tmp.objectlen);
1876                 mn->object[tmp.objectlen] = 0;
1877                 mn->objectlen = tmp.objectlen;
1878                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1879                 mio->copyups--;
1880                 signal_mapnode(mn);
1881                 signal_pr(pr);
1882         } else if (req->op == X_COPY) {
1883         //      issue write_object;
1884                 mn->flags &= ~MF_OBJECT_COPYING;
1885                 struct map *map = mn->map;
1886                 if (!map){
1887                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1888                         goto out_err;
1889                 }
1890
1891                 /* construct a tmp map_node for writing purposes */
1892                 char *target = xseg_get_target(peer->xseg, req);
1893                 struct map_node newmn = *mn;
1894                 newmn.flags = MF_OBJECT_EXIST;
1895                 strncpy(newmn.object, target, req->targetlen);
1896                 newmn.object[req->targetlen] = 0;
1897                 newmn.objectlen = req->targetlen;
1898                 newmn.objectidx = mn->objectidx; 
1899                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1900                 if (!xreq){
1901                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1902                                         "\n\t of map %s [%llu]",
1903                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1904                         goto out_err;
1905                 }
1906                 mn->flags |= MF_OBJECT_WRITING;
1907                 __set_copyup_node (mio, xreq, mn);
1908
1909                 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1910         } else {
1911                 //wtf??
1912                 ;
1913         }
1914
1915 out:
1916         xseg_put_request(peer->xseg, req, pr->portno);
1917         return;
1918
1919 out_err:
1920         mio->copyups--;
1921         XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1922         mio->err = 1;
1923         if (mn)
1924                 signal_mapnode(mn);
1925         signal_pr(pr);
1926         goto out;
1927
1928 }
1929
1930 struct r2o {
1931         struct map_node *mn;
1932         uint64_t offset;
1933         uint64_t size;
1934 };
1935
1936 static int req2objs(struct peer_req *pr, struct map *map, int write)
1937 {
1938         int r = 0;
1939         struct peerd *peer = pr->peer;
1940         struct mapper_io *mio = __get_mapper_io(pr);
1941         char *target = xseg_get_target(peer->xseg, pr->req);
1942         uint32_t nr_objs = calc_nr_obj(pr->req);
1943         uint64_t size = sizeof(struct xseg_reply_map) + 
1944                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1945         uint32_t idx, i;
1946         uint64_t rem_size, obj_index, obj_offset, obj_size; 
1947         struct map_node *mn;
1948         mio->copyups = 0;
1949         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1950
1951         /* get map_nodes of request */
1952         struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1953         if (!mns){
1954                 XSEGLOG2(&lc, E, "Cannot allocate mns");
1955                 return -1;
1956         }
1957         idx = 0;
1958         rem_size = pr->req->size;
1959         obj_index = pr->req->offset / block_size;
1960         obj_offset = pr->req->offset & (block_size -1); //modulo
1961         obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1962         mn = get_mapnode(map, obj_index);
1963         if (!mn) {
1964                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1965                 r = -1;
1966                 goto out;
1967         }
1968         mns[idx].mn = mn;
1969         mns[idx].offset = obj_offset;
1970         mns[idx].size = obj_size;
1971         rem_size -= obj_size;
1972         while (rem_size > 0) {
1973                 idx++;
1974                 obj_index++;
1975                 obj_offset = 0;
1976                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
1977                 rem_size -= obj_size;
1978                 mn = get_mapnode(map, obj_index);
1979                 if (!mn) {
1980                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1981                         r = -1;
1982                         goto out;
1983                 }
1984                 mns[idx].mn = mn;
1985                 mns[idx].offset = obj_offset;
1986                 mns[idx].size = obj_size;
1987         }
1988         if (write) {
1989                 int can_wait = 0;
1990                 mio->cb=copyup_cb;
1991                 /* do a first scan and issue as many copyups as we can.
1992                  * then retry and wait when an object is not ready.
1993                  * this could be done better, since now we wait also on the
1994                  * pending copyups
1995                  */
1996                 int j;
1997                 for (j = 0; j < 2 && !mio->err; j++) {
1998                         for (i = 0; i < (idx+1); i++) {
1999                                 mn = mns[i].mn;
2000                                 //do copyups
2001                                 if (mn->flags & MF_OBJECT_NOT_READY){
2002                                         if (!can_wait)
2003                                                 continue;
2004                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2005                                         if (mn->flags & MF_OBJECT_DESTROYED){
2006                                                 mio->err = 1;
2007                                                 continue;
2008                                         }
2009                                 }
2010
2011                                 if (!(mn->flags & MF_OBJECT_EXIST)) {
2012                                         //calc new_target, copy up object
2013                                         if (copyup_object(peer, mn, pr) == NULL){
2014                                                 XSEGLOG2(&lc, E, "Error in copy up object");
2015                                                 mio->err = 1;
2016                                         } else {
2017                                                 mio->copyups++;
2018                                         }
2019                                 }
2020
2021                                 if (mio->err){
2022                                         XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
2023                                         break;
2024                                 }
2025                         }
2026                         can_wait = 1;
2027                 }
2028                 wait_on_pr(pr, mio->copyups > 0);
2029         }
2030
2031         if (mio->err){
2032                 r = -1;
2033                 XSEGLOG2(&lc, E, "Mio->err");
2034                 goto out;
2035         }
2036
2037         /* resize request to fit reply */
2038         char buf[XSEG_MAX_TARGETLEN];
2039         strncpy(buf, target, pr->req->targetlen);
2040         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
2041         if (r < 0) {
2042                 XSEGLOG2(&lc, E, "Cannot resize request");
2043                 goto out;
2044         }
2045         target = xseg_get_target(peer->xseg, pr->req);
2046         strncpy(target, buf, pr->req->targetlen);
2047
2048         /* structure reply */
2049         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2050         reply->cnt = nr_objs;
2051         for (i = 0; i < (idx+1); i++) {
2052                 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
2053                 reply->segs[i].targetlen = mns[i].mn->objectlen;
2054                 reply->segs[i].offset = mns[i].offset;
2055                 reply->segs[i].size = mns[i].size;
2056         }
2057 out:
2058         for (i = 0; i < idx; i++) {
2059                 put_mapnode(mns[i].mn);
2060         }
2061         free(mns);
2062         mio->cb = NULL;
2063         return r;
2064 }
2065
2066 static int do_dropcache(struct peer_req *pr, struct map *map)
2067 {
2068         struct map_node *mn;
2069         struct peerd *peer = pr->peer;
2070         struct mapperd *mapper = __get_mapperd(peer);
2071         uint64_t i;
2072         XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
2073         map->flags |= MF_MAP_DROPPING_CACHE;
2074         for (i = 0; i < calc_map_obj(map); i++) {
2075                 mn = get_mapnode(map, i);
2076                 if (mn) {
2077                         if (!(mn->flags & MF_OBJECT_DESTROYED)){
2078                                 //make sure all pending operations on all objects are completed
2079                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2080                                 mn->flags |= MF_OBJECT_DESTROYED;
2081                         }
2082                         put_mapnode(mn);
2083                 }
2084         }
2085         map->flags &= ~MF_MAP_DROPPING_CACHE;
2086         map->flags |= MF_MAP_DESTROYED;
2087         remove_map(mapper, map);
2088         XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
2089         put_map(map);   // put map here to destroy it (matches m->ref = 1 on map create)
2090         return 0;
2091 }
2092
2093 static int do_info(struct peer_req *pr, struct map *map)
2094 {
2095         struct peerd *peer = pr->peer;
2096         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
2097         xinfo->size = map->size;
2098         return 0;
2099 }
2100
2101
2102 static int do_open(struct peer_req *pr, struct map *map)
2103 {
2104         if (map->flags & MF_MAP_EXCLUSIVE){
2105                 return 0;
2106         }
2107         else {
2108                 return -1;
2109         }
2110 }
2111
2112 static int do_close(struct peer_req *pr, struct map *map)
2113 {
2114         if (map->flags & MF_MAP_EXCLUSIVE){
2115                 /* do not drop cache if close failed and map not deleted */
2116                 if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
2117                         return -1;
2118         }
2119         return do_dropcache(pr, map);
2120 }
2121
2122 static int do_snapshot(struct peer_req *pr, struct map *map)
2123 {
2124         uint64_t i;
2125         struct peerd *peer = pr->peer;
2126         struct mapper_io *mio = __get_mapper_io(pr);
2127         struct map_node *mn;
2128         struct xseg_request *req;
2129
2130         if (!(map->flags & MF_MAP_EXCLUSIVE)){
2131                 XSEGLOG2(&lc, E, "Map was not opened exclusively");
2132                 return -1;
2133         }
2134         XSEGLOG2(&lc, I, "Starting snapshot for map %s", map->volume);
2135         map->flags |= MF_MAP_SNAPSHOTTING;
2136
2137         uint64_t nr_obj = calc_map_obj(map);
2138         mio->cb = snapshot_cb;
2139         mio->snap_pending = 0;
2140         mio->err = 0;
2141         for (i = 0; i < nr_obj; i++){
2142
2143                 /* throttle pending snapshots
2144                  * this should be nr_ops of the blocker, but since we don't know
2145                  * that, we assume based on our own nr_ops
2146                  */
2147                 wait_on_pr(pr, mio->snap_pending >= peer->nr_ops);
2148
2149                 mn = get_mapnode(map, i);
2150                 if (!mn)
2151                         //warning?
2152                         continue;
2153                 if (!(mn->flags & MF_OBJECT_EXIST)){
2154                         put_mapnode(mn);
2155                         continue;
2156                 }
2157                 // make sure all pending operations on all objects are completed
2158                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2159
2160                 /* TODO will this ever happen?? */
2161                 if (mn->flags & MF_OBJECT_DESTROYED){
2162                         put_mapnode(mn);
2163                         continue;
2164                 }
2165
2166                 req = __snapshot_object(pr, mn);
2167                 if (!req){
2168                         mio->err = 1;
2169                         put_mapnode(mn);
2170                         break;
2171                 }
2172                 mio->snap_pending++;
2173                 /* do not put_mapnode here. cb does that */
2174         }
2175
2176         wait_on_pr(pr, mio->snap_pending > 0);
2177         mio->cb = NULL;
2178
2179         if (mio->err)
2180                 goto out_err;
2181
2182         /* calculate name of snapshot */
2183         struct map tmp_map = *map;
2184         unsigned char sha[SHA256_DIGEST_SIZE];
2185         unsigned char *buf = malloc(block_size);
2186         char newvolumename[MAX_VOLUME_LEN];
2187         uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
2188         uint64_t pos = 0;
2189         uint64_t max_objidx = calc_map_obj(map);
2190         int r;
2191
2192         for (i = 0; i < max_objidx; i++) {
2193                 mn = find_object(map, i);
2194                 if (!mn){
2195                         XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
2196                                         (unsigned long long) i, map->volume);
2197                         goto out_err;
2198                 }
2199                 v0_object_to_map(mn, buf+pos);
2200                 pos += v0_objectsize_in_map;
2201         }
2202         SHA256(buf, pos, sha);
2203         hexlify(sha, newvolumename);
2204         strncpy(tmp_map.volume, newvolumename, newvolumenamelen);
2205         tmp_map.volumelen = newvolumenamelen;
2206         free(buf);
2207         tmp_map.version = 0; // set volume version to pithos image
2208
2209         /* write the map of the Snapshot */
2210         r = write_map(pr, &tmp_map);
2211         if (r < 0)
2212                 goto out_err;
2213
2214         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
2215                         sizeof(struct xseg_reply_snapshot));
2216         if (r < 0){
2217                 XSEGLOG2(&lc, E, "Cannot resize request");
2218                 goto out_err;
2219         }
2220         struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *)
2221                                                 xseg_get_data(peer->xseg, pr->req);
2222         strncpy(xreply->target, newvolumename, newvolumenamelen);
2223         xreply->targetlen = newvolumenamelen;
2224         map->flags &= ~MF_MAP_SNAPSHOTTING;
2225         XSEGLOG2(&lc, I, "Snapshot for map %s completed", map->volume);
2226         return 0;
2227
2228 out_err:
2229         map->flags &= ~MF_MAP_SNAPSHOTTING;
2230         XSEGLOG2(&lc, E, "Snapshot for map %s failed", map->volume);
2231         return -1;
2232 }
2233
2234
2235 static int do_destroy(struct peer_req *pr, struct map *map)
2236 {
2237         uint64_t i;
2238         struct peerd *peer = pr->peer;
2239         struct mapper_io *mio = __get_mapper_io(pr);
2240         struct map_node *mn;
2241         struct xseg_request *req;
2242
2243         if (!(map->flags & MF_MAP_EXCLUSIVE))
2244                 return -1;
2245
2246         XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
2247         req = __delete_map(pr, map);
2248         if (!req)
2249                 return -1;
2250         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
2251         if (req->state & XS_FAILED){
2252                 xseg_put_request(peer->xseg, req, pr->portno);
2253                 map->flags &= ~MF_MAP_DELETING;
2254                 return -1;
2255         }
2256         xseg_put_request(peer->xseg, req, pr->portno);
2257
2258         uint64_t nr_obj = calc_map_obj(map);
2259         mio->cb = deletion_cb;
2260         mio->del_pending = 0;
2261         mio->err = 0;
2262         for (i = 0; i < nr_obj; i++){
2263
2264                 /* throttle pending deletions
2265                  * this should be nr_ops of the blocker, but since we don't know
2266                  * that, we assume based on our own nr_ops
2267                  */
2268                 wait_on_pr(pr, mio->del_pending >= peer->nr_ops);
2269
2270                 mn = get_mapnode(map, i);
2271                 if (!mn)
2272                         continue;
2273                 if (mn->flags & MF_OBJECT_DESTROYED){
2274                         put_mapnode(mn);
2275                         continue;
2276                 }
2277                 if (!(mn->flags & MF_OBJECT_EXIST)){
2278                         mn->flags |= MF_OBJECT_DESTROYED;
2279                         put_mapnode(mn);
2280                         continue;
2281                 }
2282
2283                 // make sure all pending operations on all objects are completed
2284                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2285
2286                 req = __delete_object(pr, mn);
2287                 if (!req){
2288                         mio->err = 1;
2289                         put_mapnode(mn);
2290                         continue;
2291                 }
2292                 mio->del_pending++;
2293                 /* do not put_mapnode here. cb does that */
2294         }
2295
2296         wait_on_pr(pr, mio->del_pending > 0);
2297
2298         mio->cb = NULL;
2299         map->flags &= ~MF_MAP_DELETING;
2300         map->flags |= MF_MAP_DELETED;
2301         XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
2302         return do_close(pr, map);
2303 }
2304
2305 static int do_mapr(struct peer_req *pr, struct map *map)
2306 {
2307         struct peerd *peer = pr->peer;
2308         int r = req2objs(pr, map, 0);
2309         if  (r < 0){
2310                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
2311                                 map->volume, 
2312                                 (unsigned long long) pr->req->offset, 
2313                                 (unsigned long long) (pr->req->offset + pr->req->size));
2314                 return -1;
2315         }
2316         XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
2317                         map->volume, 
2318                         (unsigned long long) pr->req->offset, 
2319                         (unsigned long long) (pr->req->offset + pr->req->size));
2320         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2321                         (unsigned long long) pr->req->offset,
2322                         (unsigned long long) pr->req->size);
2323         char buf[XSEG_MAX_TARGETLEN+1];
2324         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2325         int i;
2326         for (i = 0; i < reply->cnt; i++) {
2327                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2328                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2329                 buf[reply->segs[i].targetlen] = 0;
2330                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2331                                 (unsigned long long) reply->segs[i].offset,
2332                                 (unsigned long long) reply->segs[i].size);
2333         }
2334         return 0;
2335 }
2336
2337 static int do_mapw(struct peer_req *pr, struct map *map)
2338 {
2339         struct peerd *peer = pr->peer;
2340         int r = req2objs(pr, map, 1);
2341         if  (r < 0){
2342                 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
2343                                 map->volume, 
2344                                 (unsigned long long) pr->req->offset, 
2345                                 (unsigned long long) (pr->req->offset + pr->req->size));
2346                 return -1;
2347         }
2348         XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
2349                         map->volume, 
2350                         (unsigned long long) pr->req->offset, 
2351                         (unsigned long long) (pr->req->offset + pr->req->size));
2352         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2353                         (unsigned long long) pr->req->offset,
2354                         (unsigned long long) pr->req->size);
2355         char buf[XSEG_MAX_TARGETLEN+1];
2356         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2357         int i;
2358         for (i = 0; i < reply->cnt; i++) {
2359                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2360                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2361                 buf[reply->segs[i].targetlen] = 0;
2362                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2363                                 (unsigned long long) reply->segs[i].offset,
2364                                 (unsigned long long) reply->segs[i].size);
2365         }
2366         return 0;
2367 }
2368
2369 //here map is the parent map
2370 static int do_clone(struct peer_req *pr, struct map *map)
2371 {
2372         int r;
2373         struct peerd *peer = pr->peer;
2374         struct mapperd *mapper = __get_mapperd(peer);
2375         char *target = xseg_get_target(peer->xseg, pr->req);
2376         struct map *clonemap;
2377         struct xseg_request_clone *xclone =
2378                 (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2379
2380         XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
2381
2382         clonemap = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2383         if (!clonemap)
2384                 return -1;
2385
2386         /* open map to get exclusive access to map */
2387         r = open_map(pr, clonemap, 0);
2388         if (r < 0){
2389                 XSEGLOG2(&lc, E, "Cannot open map %s", clonemap->volume);
2390                 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2391                 goto out_err;
2392         }
2393         r = load_map(pr, clonemap);
2394         if (r >= 0) {
2395                 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2396                 goto out_err;
2397         }
2398
2399         if (xclone->size == -1)
2400                 clonemap->size = map->size;
2401         else
2402                 clonemap->size = xclone->size;
2403         if (clonemap->size < map->size){
2404                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
2405                                 "\n\t for requested clone %s",
2406                                 (unsigned long long) xclone->size,
2407                                 (unsigned long long) map->size, clonemap->volume);
2408                 goto out_err;
2409         }
2410         if (clonemap->size > MAX_VOLUME_SIZE) {
2411                 XSEGLOG2(&lc, E, "Requested size %llu > max volume size %llu"
2412                                 "\n\t for volume %s",
2413                                 clonemap->size, MAX_VOLUME_SIZE, clonemap->volume);
2414                 goto out_err;
2415         }
2416
2417         //alloc and init map_nodes
2418         //unsigned long c = clonemap->size/block_size + 1;
2419         unsigned long c = calc_map_obj(clonemap);
2420         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
2421         if (!map_nodes){
2422                 goto out_err;
2423         }
2424         int i;
2425         //for (i = 0; i < clonemap->size/block_size + 1; i++) {
2426         for (i = 0; i < c; i++) {
2427                 struct map_node *mn = get_mapnode(map, i);
2428                 if (mn) {
2429                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
2430                         map_nodes[i].objectlen = mn->objectlen;
2431                         put_mapnode(mn);
2432                 } else {
2433                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2434                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2435                 }
2436                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2437                 map_nodes[i].flags = 0;
2438                 map_nodes[i].objectidx = i;
2439                 map_nodes[i].map = clonemap;
2440                 map_nodes[i].ref = 1;
2441                 map_nodes[i].waiters = 0;
2442                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2443                 r = insert_object(clonemap, &map_nodes[i]);
2444                 if (r < 0){
2445                         XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
2446                         goto out_err;
2447                 }
2448         }
2449
2450         r = write_map(pr, clonemap);
2451         if (r < 0){
2452                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
2453                 goto out_err;
2454         }
2455         do_close(pr, clonemap);
2456         return 0;
2457
2458 out_err:
2459         do_close(pr, clonemap);
2460         return -1;
2461 }
2462
2463 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
2464 {
2465         int r, opened = 0;
2466         if (flags & MF_EXCLUSIVE){
2467                 r = open_map(pr, map, flags);
2468                 if (r < 0) {
2469                         if (flags & MF_FORCE){
2470                                 return -1;
2471                         }
2472                 } else {
2473                         opened = 1;
2474                 }
2475         }
2476         r = load_map(pr, map);
2477         if (r < 0 && opened){
2478                 close_map(pr, map);
2479         }
2480         return r;
2481 }
2482
2483 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
2484                         uint32_t flags)
2485 {
2486         int r;
2487         struct peerd *peer = pr->peer;
2488         struct mapperd *mapper = __get_mapperd(peer);
2489         struct map *map = find_map_len(mapper, name, namelen, flags);
2490         if (!map){
2491                 if (flags & MF_LOAD){
2492                         map = create_map(mapper, name, namelen, flags);
2493                         if (!map)
2494                                 return NULL;
2495                         r = open_load_map(pr, map, flags);
2496                         if (r < 0){
2497                                 do_dropcache(pr, map);
2498                                 return NULL;
2499                         }
2500                 } else {
2501                         return NULL;
2502                 }
2503         } else if (map->flags & MF_MAP_DESTROYED){
2504                 return NULL;
2505         }
2506         __get_map(map);
2507         return map;
2508
2509 }
2510
2511 static int map_action(int (action)(struct peer_req *pr, struct map *map),
2512                 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
2513 {
2514         //struct peerd *peer = pr->peer;
2515         struct map *map;
2516 start:
2517         map = get_map(pr, name, namelen, flags);
2518         if (!map)
2519                 return -1;
2520         if (map->flags & MF_MAP_NOT_READY){
2521                 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
2522                 put_map(map);
2523                 goto start;
2524         }
2525         int r = action(pr, map);
2526         //always drop cache if map not read exclusively
2527         if (!(map->flags & MF_MAP_EXCLUSIVE))
2528                 do_dropcache(pr, map);
2529         signal_map(map);
2530         put_map(map);
2531         return r;
2532 }
2533
2534 void * handle_info(struct peer_req *pr)
2535 {
2536         struct peerd *peer = pr->peer;
2537         char *target = xseg_get_target(peer->xseg, pr->req);
2538         int r = map_action(do_info, pr, target, pr->req->targetlen,
2539                                 MF_ARCHIP|MF_LOAD);
2540         if (r < 0)
2541                 fail(peer, pr);
2542         else
2543                 complete(peer, pr);
2544         ta--;
2545         return NULL;
2546 }
2547
2548 void * handle_clone(struct peer_req *pr)
2549 {
2550         int r;
2551         struct peerd *peer = pr->peer;
2552         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2553         if (!xclone) {
2554                 r = -1;
2555                 goto out;
2556         }
2557
2558         if (xclone->targetlen){
2559                 /* if snap was defined */
2560                 //support clone only from pithos
2561                 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
2562                                         MF_LOAD);
2563         } else {
2564                 /* else try to create a new volume */
2565                 XSEGLOG2(&lc, I, "Creating volume");
2566                 if (!xclone->size){
2567                         XSEGLOG2(&lc, E, "Cannot create volume. Size not specified");
2568                         r = -1;
2569                         goto out;
2570                 }
2571                 if (xclone->size > MAX_VOLUME_SIZE) {
2572                         XSEGLOG2(&lc, E, "Requested size %llu > max volume "
2573                                         "size %llu", xclone->size, MAX_VOLUME_SIZE);
2574                         r = -1;
2575                         goto out;
2576                 }
2577
2578                 struct map *map;
2579                 char *target = xseg_get_target(peer->xseg, pr->req);
2580
2581                 //create a new empty map of size
2582                 map = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2583                 if (!map){
2584                         r = -1;
2585                         goto out;
2586                 }
2587                 /* open map to get exclusive access to map */
2588                 r = open_map(pr, map, 0);
2589                 if (r < 0){
2590                         XSEGLOG2(&lc, E, "Cannot open map %s", map->volume);
2591                         XSEGLOG2(&lc, E, "Target volume %s exists", map->volume);
2592                         do_dropcache(pr, map);
2593                         r = -1;
2594                         goto out;
2595                 }
2596                 r = load_map(pr, map);
2597                 if (r >= 0) {
2598                         XSEGLOG2(&lc, E, "Map exists %s", map->volume);
2599                         do_close(pr, map);
2600                         r = -1;
2601                         goto out;
2602                 }
2603                 map->size = xclone->size;
2604                 //populate_map with zero objects;
2605                 uint64_t nr_objs = xclone->size / block_size;
2606                 if (xclone->size % block_size)
2607                         nr_objs++;
2608
2609                 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
2610                 if (!map_nodes){
2611                         do_close(pr, map);
2612                         r = -1;
2613                         goto out;
2614                 }
2615
2616                 uint64_t i;
2617                 for (i = 0; i < nr_objs; i++) {
2618                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2619                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2620                         map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2621                         map_nodes[i].flags = 0;
2622                         map_nodes[i].objectidx = i;
2623                         map_nodes[i].map = map;
2624                         map_nodes[i].ref = 1;
2625                         map_nodes[i].waiters = 0;
2626                         map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2627                         r = insert_object(map, &map_nodes[i]);
2628                         if (r < 0){
2629                                 do_close(pr, map);
2630                                 r = -1;
2631                                 goto out;
2632                         }
2633                 }
2634                 r = write_map(pr, map);
2635                 if (r < 0){
2636                         XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
2637                         do_close(pr, map);
2638                         goto out;
2639                 }
2640                 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
2641                 r = 0;
2642                 do_close(pr, map); //drop cache here for consistency
2643         }
2644 out:
2645         if (r < 0)
2646                 fail(peer, pr);
2647         else
2648                 complete(peer, pr);
2649         ta--;
2650         return NULL;
2651 }
2652
2653 void * handle_mapr(struct peer_req *pr)
2654 {
2655         struct peerd *peer = pr->peer;
2656         char *target = xseg_get_target(peer->xseg, pr->req);
2657         int r = map_action(do_mapr, pr, target, pr->req->targetlen,
2658                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2659         if (r < 0)
2660                 fail(peer, pr);
2661         else
2662                 complete(peer, pr);
2663         ta--;
2664         return NULL;
2665 }
2666
2667 void * handle_mapw(struct peer_req *pr)
2668 {
2669         struct peerd *peer = pr->peer;
2670         char *target = xseg_get_target(peer->xseg, pr->req);
2671         int r = map_action(do_mapw, pr, target, pr->req->targetlen,
2672                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2673         if (r < 0)
2674                 fail(peer, pr);
2675         else
2676                 complete(peer, pr);
2677         XSEGLOG2(&lc, D, "Ta: %d", ta);
2678         ta--;
2679         return NULL;
2680 }
2681
2682 void * handle_destroy(struct peer_req *pr)
2683 {
2684         struct peerd *peer = pr->peer;
2685         char *target = xseg_get_target(peer->xseg, pr->req);
2686         /* request EXCLUSIVE access, but do not force it.
2687          * check if succeeded on do_destroy
2688          */
2689         int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2690                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2691         if (r < 0)
2692                 fail(peer, pr);
2693         else
2694                 complete(peer, pr);
2695         ta--;
2696         return NULL;
2697 }
2698
2699 void * handle_open(struct peer_req *pr)
2700 {
2701         struct peerd *peer = pr->peer;
2702         char *target = xseg_get_target(peer->xseg, pr->req);
2703         //here we do not want to load
2704         int r = map_action(do_open, pr, target, pr->req->targetlen,
2705                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2706         if (r < 0)
2707                 fail(peer, pr);
2708         else
2709                 complete(peer, pr);
2710         ta--;
2711         return NULL;
2712 }
2713
2714 void * handle_close(struct peer_req *pr)
2715 {
2716         struct peerd *peer = pr->peer;
2717         char *target = xseg_get_target(peer->xseg, pr->req);
2718         //here we do not want to load
2719         int r = map_action(do_close, pr, target, pr->req->targetlen,
2720                                 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2721         if (r < 0)
2722                 fail(peer, pr);
2723         else
2724                 complete(peer, pr);
2725         ta--;
2726         return NULL;
2727 }
2728
2729 void * handle_snapshot(struct peer_req *pr)
2730 {
2731         struct peerd *peer = pr->peer;
2732         char *target = xseg_get_target(peer->xseg, pr->req);
2733         /* request EXCLUSIVE access, but do not force it.
2734          * check if succeeded on do_destroy
2735          */
2736         int r = map_action(do_snapshot, pr, target, pr->req->targetlen,
2737                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2738         if (r < 0)
2739                 fail(peer, pr);
2740         else
2741                 complete(peer, pr);
2742         ta--;
2743         return NULL;
2744 }
2745
2746 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
2747                         struct xseg_request *req)
2748 {
2749         //struct mapperd *mapper = __get_mapperd(peer);
2750         struct mapper_io *mio = __get_mapper_io(pr);
2751         void *(*action)(struct peer_req *) = NULL;
2752
2753         mio->state = ACCEPTED;
2754         mio->err = 0;
2755         mio->cb = NULL;
2756         switch (pr->req->op) {
2757                 /* primary xseg operations of mapper */
2758                 case X_CLONE: action = handle_clone; break;
2759                 case X_MAPR: action = handle_mapr; break;
2760                 case X_MAPW: action = handle_mapw; break;
2761                 case X_SNAPSHOT: action = handle_snapshot; break;
2762                 case X_INFO: action = handle_info; break;
2763                 case X_DELETE: action = handle_destroy; break;
2764                 case X_OPEN: action = handle_open; break;
2765                 case X_CLOSE: action = handle_close; break;
2766                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2767         }
2768         if (action){
2769                 ta++;
2770                 mio->active = 1;
2771                 st_thread_create(action, pr, 0, 0);
2772         }
2773         return 0;
2774
2775 }
2776
2777 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2778                 enum dispatch_reason reason)
2779 {
2780         struct mapperd *mapper = __get_mapperd(peer);
2781         (void) mapper;
2782         struct mapper_io *mio = __get_mapper_io(pr);
2783         (void) mio;
2784
2785
2786         if (reason == dispatch_accept)
2787                 dispatch_accepted(peer, pr, req);
2788         else {
2789                 if (mio->cb){
2790                         mio->cb(pr, req);
2791                 } else { 
2792                         signal_pr(pr);
2793                 }
2794         }
2795         return 0;
2796 }
2797
2798 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2799 {
2800         int i;
2801
2802         //FIXME error checks
2803         struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2804         peer->priv = mapperd;
2805         mapper = mapperd;
2806         mapper->hashmaps = xhash_new(3, STRING);
2807
2808         for (i = 0; i < peer->nr_ops; i++) {
2809                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2810                 mio->copyups_nodes = xhash_new(3, INTEGER);
2811                 mio->copyups = 0;
2812                 mio->err = 0;
2813                 mio->active = 0;
2814                 peer->peer_reqs[i].priv = mio;
2815         }
2816
2817         mapper->bportno = -1;
2818         mapper->mbportno = -1;
2819         BEGIN_READ_ARGS(argc, argv);
2820         READ_ARG_ULONG("-bp", mapper->bportno);
2821         READ_ARG_ULONG("-mbp", mapper->mbportno);
2822         END_READ_ARGS();
2823         if (mapper->bportno == -1){
2824                 XSEGLOG2(&lc, E, "Portno for blocker must be provided");
2825                 usage(argv[0]);
2826                 return -1;
2827         }
2828         if (mapper->mbportno == -1){
2829                 XSEGLOG2(&lc, E, "Portno for mblocker must be provided");
2830                 usage(argv[0]);
2831                 return -1;
2832         }
2833
2834         const struct sched_param param = { .sched_priority = 99 };
2835         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
2836         /* FIXME maybe place it in peer
2837          * should be done for each port (sportno to eportno)
2838          */
2839         xseg_set_max_requests(peer->xseg, peer->portno_start, 5000);
2840         xseg_set_freequeue_size(peer->xseg, peer->portno_start, 3000, 0);
2841
2842
2843 //      test_map(peer);
2844
2845         return 0;
2846 }
2847
2848 /* FIXME this should not be here */
2849 int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
2850 {
2851         struct xseg *xseg = peer->xseg;
2852         xport portno_start = peer->portno_start;
2853         xport portno_end = peer->portno_end;
2854         struct peer_req *pr;
2855         xport i;
2856         int  r, c = 0;
2857         struct xseg_request *received;
2858         xseg_prepare_wait(xseg, portno_start);
2859         while(1) {
2860                 XSEGLOG2(&lc, D, "Attempting to check for reply");
2861                 c = 1;
2862                 while (c){
2863                         c = 0;
2864                         for (i = portno_start; i <= portno_end; i++) {
2865                                 received = xseg_receive(xseg, i, 0);
2866                                 if (received) {
2867                                         c = 1;
2868                                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
2869                                         if (r < 0 || !pr || received != expected_req){
2870                                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
2871                                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
2872                                                 if (p == NoPort){
2873                                                         XSEGLOG2(&lc, W, "Could not respond stale request");
2874                                                         xseg_put_request(xseg, received, portno_start);
2875                                                         continue;
2876                                                 } else {
2877                                                         xseg_signal(xseg, p);
2878                                                 }
2879                                         } else {
2880                                                 xseg_cancel_wait(xseg, portno_start);
2881                                                 return 0;
2882                                         }
2883                                 }
2884                         }
2885                 }
2886                 xseg_wait_signal(xseg, 1000000UL);
2887         }
2888 }
2889
2890
2891 void custom_peer_finalize(struct peerd *peer)
2892 {
2893         struct mapperd *mapper = __get_mapperd(peer);
2894         struct peer_req *pr = alloc_peer_req(peer);
2895         if (!pr){
2896                 XSEGLOG2(&lc, E, "Cannot get peer request");
2897                 return;
2898         }
2899         struct map *map;
2900         struct xseg_request *req;
2901         xhash_iter_t it;
2902         xhashidx key, val;
2903         xhash_iter_init(mapper->hashmaps, &it);
2904         while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
2905                 map = (struct map *)val;
2906                 if (!(map->flags & MF_MAP_EXCLUSIVE))
2907                         continue;
2908                 req = __close_map(pr, map);
2909                 if (!req)
2910                         continue;
2911                 wait_reply(peer, req);
2912                 if (!(req->state & XS_SERVED))
2913                         XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
2914                 map->flags &= ~MF_MAP_CLOSING;
2915                 xseg_put_request(peer->xseg, req, pr->portno);
2916         }
2917         return;
2918
2919
2920 }
2921
2922 void print_obj(struct map_node *mn)
2923 {
2924         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2925                         (unsigned long long) mn->objectidx, mn->object, 
2926                         (unsigned int) mn->objectlen, 
2927                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2928 }
2929
2930 void print_map(struct map *m)
2931 {
2932         uint64_t nr_objs = m->size/block_size;
2933         if (m->size % block_size)
2934                 nr_objs++;
2935         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n", 
2936                         m->volume, m->volumelen, 
2937                         (unsigned long long) m->size, 
2938                         (unsigned long long) nr_objs,
2939                         m->version);
2940         uint64_t i;
2941         struct map_node *mn;
2942         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2943                 return;
2944         for (i = 0; i < nr_objs; i++) {
2945                 mn = find_object(m, i);
2946                 if (!mn){
2947                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
2948                         continue;
2949                 }
2950                 print_obj(mn);
2951         }
2952 }
2953
2954 /*
2955 void test_map(struct peerd *peer)
2956 {
2957         int i,j, ret;
2958         //struct sha256_ctx sha256ctx;
2959         unsigned char buf[SHA256_DIGEST_SIZE];
2960         char buf_new[XSEG_MAX_TARGETLEN + 20];
2961         struct map *m = malloc(sizeof(struct map));
2962         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2963         m->volume[XSEG_MAX_TARGETLEN] = 0;
2964         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2965         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2966         m->volumelen = XSEG_MAX_TARGETLEN;
2967         m->size = 100*block_size;
2968         m->objects = xhash_new(3, INTEGER);
2969         struct map_node *map_node = calloc(100, sizeof(struct map_node));
2970         for (i = 0; i < 100; i++) {
2971                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2972                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2973                 
2974                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2975                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2976                 }
2977                 map_node[i].objectidx = i;
2978                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2979                 map_node[i].flags = MF_OBJECT_EXIST;
2980                 ret = insert_object(m, &map_node[i]);
2981         }
2982
2983         char *data = malloc(block_size);
2984         mapheader_to_map(m, data);
2985         uint64_t pos = mapheader_size;
2986
2987         for (i = 0; i < 100; i++) {
2988                 map_node = find_object(m, i);
2989                 if (!map_node){
2990                         printf("no object node %d \n", i);
2991                         exit(1);
2992                 }
2993                 object_to_map(data+pos, map_node);
2994                 pos += objectsize_in_map;
2995         }
2996 //      print_map(m);
2997
2998         struct map *m2 = malloc(sizeof(struct map));
2999         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
3000         m->volume[XSEG_MAX_TARGETLEN] = 0;
3001         m->volumelen = XSEG_MAX_TARGETLEN;
3002
3003         m2->objects = xhash_new(3, INTEGER);
3004         ret = read_map(peer, m2, data);
3005 //      print_map(m2);
3006
3007         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
3008         ssize_t r, sum = 0;
3009         while (sum < block_size) {
3010                 r = write(fd, data + sum, block_size -sum);
3011                 if (r < 0){
3012                         perror("write");
3013                         printf("write error\n");
3014                         exit(1);
3015                 } 
3016                 sum += r;
3017         }
3018         close(fd);
3019         map_node = find_object(m, 0);
3020         free(map_node);
3021         free(m);
3022 }
3023 */