8e24104f4cd585a4e9c25582745829b7a076d2e6
[archipelago] / xseg / peers / user / mt-mapperd.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <stdio.h>
36 #include <unistd.h>
37 #include <sys/types.h>
38 #include <pthread.h>
39 #include <xseg/xseg.h>
40 #include <peer.h>
41 #include <time.h>
42 #include <xtypes/xlock.h>
43 #include <xtypes/xhash.h>
44 #include <xseg/protocol.h>
45 #include <sys/stat.h>
46 #include <fcntl.h>
47 #include <errno.h>
48 #include <sched.h>
49 #include <sys/syscall.h>
50 #include <openssl/sha.h>
51 #include <ctype.h>
52
53 /* general mapper flags */
54 #define MF_LOAD         (1 << 0)
55 #define MF_EXCLUSIVE    (1 << 1)
56 #define MF_FORCE        (1 << 2)
57 #define MF_ARCHIP       (1 << 3)
58
59 #ifndef SHA256_DIGEST_SIZE
60 #define SHA256_DIGEST_SIZE 32
61 #endif
62 /* hex representation of sha256 value takes up double the sha256 size */
63 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
64
65 #define block_size (1<<22) //FIXME this should be defined here?
66
67 /* transparency byte + max object len in disk */
68 #define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
69
70 /* Map header contains:
71  *      map version
72  *      volume size
73  */
74 #define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
75
76
77 #define MAPPER_PREFIX "archip_"
78 #define MAPPER_PREFIX_LEN 7
79
80 #define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
81 #define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
82
83 #if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
84 #error  "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
85 #endif
86
87 #define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
88
89 #if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
90 #error  "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
91 #endif
92
93 #define MAX_VOLUME_SIZE \
94 ((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
95
96
97 //char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
98
99 /* pithos considers this a block full of zeros, so should we.
100  * it is actually the sha256 hash of nothing.
101  */
102 char *zero_block="e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
103 #define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
104
105 /* dispatch_internal mapper states */
106 enum mapper_state {
107         ACCEPTED = 0,
108         WRITING = 1,
109         COPYING = 2,
110         DELETING = 3,
111         DROPPING_CACHE = 4
112 };
113
114 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
115
116
117 /* mapper object flags */
118 #define MF_OBJECT_EXIST         (1 << 0)
119 #define MF_OBJECT_COPYING       (1 << 1)
120 #define MF_OBJECT_WRITING       (1 << 2)
121 #define MF_OBJECT_DELETING      (1 << 3)
122 #define MF_OBJECT_DESTROYED     (1 << 5)
123 #define MF_OBJECT_SNAPSHOTTING  (1 << 6)
124
125 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
126                                 MF_OBJECT_DELETING|MF_OBJECT_SNAPSHOTTING)
127 struct map_node {
128         uint32_t flags;
129         uint32_t objectidx;
130         uint32_t objectlen;
131         char object[MAX_OBJECT_LEN + 1];        /* NULL terminated string */
132         struct map *map;
133         uint32_t ref;
134         uint32_t waiters;
135         st_cond_t cond;
136 };
137
138
139 #define wait_on_pr(__pr, __condition__)         \
140         while (__condition__){                  \
141                 ta--;                           \
142                 __get_mapper_io(pr)->active = 0;\
143                 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
144                 st_cond_wait(__pr->cond);       \
145         }
146
147 #define wait_on_mapnode(__mn, __condition__)    \
148         while (__condition__){                  \
149                 ta--;                           \
150                 __mn->waiters++;                \
151                 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
152                         ta: %u",  __mn, __mn->object, __mn->waiters, ta);  \
153                 st_cond_wait(__mn->cond);       \
154         }
155
156 #define wait_on_map(__map, __condition__)       \
157         while (__condition__){                  \
158                 ta--;                           \
159                 __map->waiters++;               \
160                 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
161                                    __map, __map->volume, __map->waiters, ta); \
162                 st_cond_wait(__map->cond);      \
163         }
164
165 #define signal_pr(__pr)                         \
166         do {                                    \
167                 if (!__get_mapper_io(pr)->active){\
168                         ta++;                   \
169                         XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta);\
170                         __get_mapper_io(pr)->active = 1;\
171                         st_cond_signal(__pr->cond);     \
172                 }                               \
173         }while(0)
174
175 #define signal_map(__map)                       \
176         do {                                    \
177                 if (__map->waiters) {           \
178                         ta += 1;                \
179                         XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
180                         ta: %u",  __map, __map->volume, __map->waiters, ta); \
181                         __map->waiters--;       \
182                         st_cond_signal(__map->cond);    \
183                 }                               \
184         }while(0)
185
186 #define signal_mapnode(__mn)                    \
187         do {                                    \
188                 if (__mn->waiters) {            \
189                         ta += __mn->waiters;    \
190                         XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
191                         %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
192                         __mn->waiters = 0;      \
193                         st_cond_broadcast(__mn->cond);  \
194                 }                               \
195         }while(0)
196
197
198 /* map flags */
199 #define MF_MAP_LOADING          (1 << 0)
200 #define MF_MAP_DESTROYED        (1 << 1)
201 #define MF_MAP_WRITING          (1 << 2)
202 #define MF_MAP_DELETING         (1 << 3)
203 #define MF_MAP_DROPPING_CACHE   (1 << 4)
204 #define MF_MAP_EXCLUSIVE        (1 << 5)
205 #define MF_MAP_OPENING          (1 << 6)
206 #define MF_MAP_CLOSING          (1 << 7)
207 #define MF_MAP_DELETED          (1 << 8)
208 #define MF_MAP_SNAPSHOTTING     (1 << 9)
209
210 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
211                                 MF_MAP_DROPPING_CACHE|MF_MAP_OPENING|          \
212                                 MF_MAP_SNAPSHOTTING)
213
214 struct map {
215         uint32_t version;
216         uint32_t flags;
217         uint64_t size;
218         uint32_t volumelen;
219         char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
220         xhash_t *objects;       /* obj_index --> map_node */
221         uint32_t ref;
222         uint32_t waiters;
223         st_cond_t cond;
224 };
225
226 struct mapperd {
227         xport bportno;          /* blocker that accesses data */
228         xport mbportno;         /* blocker that accesses maps */
229         xhash_t *hashmaps; // hash_function(target) --> struct map
230 };
231
232 struct mapper_io {
233         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
234         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
235         struct map_node *copyup_node;
236         volatile int err;                       /* error flag */
237         volatile uint64_t del_pending;
238         volatile uint64_t snap_pending;
239         uint64_t delobj;
240         uint64_t dcobj;
241         cb_t cb;
242         enum mapper_state state;
243         volatile int active;
244 };
245
246 /* global vars */
247 struct mapperd *mapper;
248
249 void print_map(struct map *m);
250
251
252 void custom_peer_usage()
253 {
254         fprintf(stderr, "Custom peer options: \n"
255                         "-bp  : port for block blocker(!)\n"
256                         "-mbp : port for map blocker\n"
257                         "\n");
258 }
259
260
261 /*
262  * Helper functions
263  */
264
265 static inline struct mapperd * __get_mapperd(struct peerd *peer)
266 {
267         return (struct mapperd *) peer->priv;
268 }
269
270 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
271 {
272         return (struct mapper_io *) pr->priv;
273 }
274
275 static inline uint64_t calc_map_obj(struct map *map)
276 {
277         if (map->size == -1)
278                 return 0;
279         uint64_t nr_objs = map->size / block_size;
280         if (map->size % block_size)
281                 nr_objs++;
282         return nr_objs;
283 }
284
285 static uint32_t calc_nr_obj(struct xseg_request *req)
286 {
287         unsigned int r = 1;
288         uint64_t rem_size = req->size;
289         uint64_t obj_offset = req->offset & (block_size -1); //modulo
290         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
291         rem_size -= obj_size;
292         while (rem_size > 0) {
293                 obj_size = (rem_size > block_size) ? block_size : rem_size;
294                 rem_size -= obj_size;
295                 r++;
296         }
297
298         return r;
299 }
300
301 /* hexlify function. 
302  * Unsafe. Doesn't check if data length is odd!
303  */
304 static void hexlify(unsigned char *data, char *hex)
305 {
306         int i;
307         for (i=0; i<SHA256_DIGEST_LENGTH; i++)
308                 sprintf(hex+2*i, "%02x", data[i]);
309 }
310
311 static void unhexlify(char *hex, unsigned char *data)
312 {
313         int i;
314         char c;
315         for (i=0; i<SHA256_DIGEST_LENGTH; i++){
316                 data[i] = 0;
317                 c = hex[2*i];
318                 if (isxdigit(c)){
319                         if (isdigit(c)){
320                                 c-= '0';
321                         }
322                         else {
323                                 c = tolower(c);
324                                 c = c-'a' + 10;
325                         }
326                 }
327                 else {
328                         c = 0;
329                 }
330                 data[i] |= (c << 4) & 0xF0;
331                 c = hex[2*i+1];
332                 if (isxdigit(c)){
333                         if (isdigit(c)){
334                                 c-= '0';
335                         }
336                         else {
337                                 c = tolower(c);
338                                 c = c-'a' + 10;
339                         }
340                 }
341                 else {
342                         c = 0;
343                 }
344                 data[i] |= c & 0x0F;
345         }
346 }
347 /*
348  * Maps handling functions
349  */
350
351 static struct map * find_map(struct mapperd *mapper, char *volume)
352 {
353         struct map *m = NULL;
354         int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
355                                 (xhashidx *) &m);
356         if (r < 0)
357                 return NULL;
358         return m;
359 }
360
361 static struct map * find_map_len(struct mapperd *mapper, char *target,
362                                         uint32_t targetlen, uint32_t flags)
363 {
364         char buf[XSEG_MAX_TARGETLEN+1];
365         if (flags & MF_ARCHIP){
366                 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
367                 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
368                 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
369                 targetlen += MAPPER_PREFIX_LEN;
370         }
371         else {
372                 strncpy(buf, target, targetlen);
373                 buf[targetlen] = 0;
374         }
375
376         if (targetlen > MAX_VOLUME_LEN){
377                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
378                                         targetlen, MAX_VOLUME_LEN);
379                 return NULL;
380         }
381
382         XSEGLOG2(&lc, D, "looking up map %s, len %u",
383                         buf, targetlen);
384         return find_map(mapper, buf);
385 }
386
387
388 static int insert_map(struct mapperd *mapper, struct map *map)
389 {
390         int r = -1;
391
392         if (find_map(mapper, map->volume)){
393                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
394                 goto out;
395         }
396
397         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
398                         map->volume, strlen(map->volume), (unsigned long) map);
399         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
400         while (r == -XHASH_ERESIZE) {
401                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
402                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
403                 if (!new_hashmap){
404                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
405                                         (unsigned long long) shift);
406                         goto out;
407                 }
408                 mapper->hashmaps = new_hashmap;
409                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
410         }
411 out:
412         return r;
413 }
414
415 static int remove_map(struct mapperd *mapper, struct map *map)
416 {
417         int r = -1;
418
419         //assert no pending pr on map
420
421         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
422         while (r == -XHASH_ERESIZE) {
423                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
424                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
425                 if (!new_hashmap){
426                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
427                                         (unsigned long long) shift);
428                         goto out;
429                 }
430                 mapper->hashmaps = new_hashmap;
431                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
432         }
433 out:
434         return r;
435 }
436
437 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
438 {
439         int r;
440         xport p;
441         struct peerd *peer = pr->peer;
442         struct xseg_request *req;
443         struct mapperd *mapper = __get_mapperd(peer);
444         void *dummy;
445
446         XSEGLOG2(&lc, I, "Closing map %s", map->volume);
447
448         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
449         if (!req){
450                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
451                                 map->volume);
452                 goto out_err;
453         }
454
455         r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
456         if (r < 0){
457                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
458                                 map->volume);
459                 goto out_put;
460         }
461
462         char *reqtarget = xseg_get_target(peer->xseg, req);
463         if (!reqtarget)
464                 goto out_put;
465         strncpy(reqtarget, map->volume, req->targetlen);
466         req->op = X_RELEASE;
467         req->size = 0;
468         req->offset = 0;
469         r = xseg_set_req_data(peer->xseg, req, pr);
470         if (r < 0){
471                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
472                                 map->volume);
473                 goto out_put;
474         }
475         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
476         if (p == NoPort){
477                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
478                                 map->volume);
479                 goto out_unset;
480         }
481         r = xseg_signal(peer->xseg, p);
482         map->flags |= MF_MAP_CLOSING;
483
484         XSEGLOG2(&lc, I, "Map %s closing", map->volume);
485         return req;
486
487 out_unset:
488         xseg_get_req_data(peer->xseg, req, &dummy);
489 out_put:
490         xseg_put_request(peer->xseg, req, pr->portno);
491 out_err:
492         return NULL;
493 }
494
495 static int close_map(struct peer_req *pr, struct map *map)
496 {
497         int err;
498         struct xseg_request *req;
499         struct peerd *peer = pr->peer;
500
501         req = __close_map(pr, map);
502         if (!req)
503                 return -1;
504         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
505         map->flags &= ~MF_MAP_CLOSING;
506         err = req->state & XS_FAILED;
507         xseg_put_request(peer->xseg, req, pr->portno);
508         if (err)
509                 return -1;
510         return 0;
511 }
512
513 /*
514 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
515                                 char *target, uint32_t targetlen, struct map **m)
516 {
517         struct mapperd *mapper = __get_mapperd(peer);
518         int r;
519         *m = find_map(mapper, target, targetlen);
520         if (*m) {
521                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
522                 if ((*m)->flags & MF_MAP_NOT_READY) {
523                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
524                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
525                         return MF_PENDING;
526                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
527                 //      return -1;
528                 // 
529                 }else {
530                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
531                         return 0;
532                 }
533         }
534         r = open_map(peer, pr, target, targetlen, 0);
535         if (r < 0)
536                 return -1; //error
537         return MF_PENDING;      
538 }
539 */
540 /*
541  * Object handling functions
542  */
543
544 struct map_node *find_object(struct map *map, uint64_t obj_index)
545 {
546         struct map_node *mn;
547         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
548         if (r < 0)
549                 return NULL;
550         return mn;
551 }
552
553 static int insert_object(struct map *map, struct map_node *mn)
554 {
555         //FIXME no find object first
556         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
557         if (r == -XHASH_ERESIZE) {
558                 unsigned long shift = xhash_grow_size_shift(map->objects);
559                 map->objects = xhash_resize(map->objects, shift, NULL);
560                 if (!map->objects)
561                         return -1;
562                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
563         }
564         return r;
565 }
566
567
568 /*
569  * map read/write functions
570  *
571  * version 0 -> pithos map
572  * version 1 -> archipelago version 1
573  *
574  *
575  * functions
576  *      int read_object(struct map_node *mn, unsigned char *buf)
577  *      int prepare_write_object(struct peer_req *pr, struct map *map,
578  *                              struct map_node *mn, struct xseg_request *req)
579  *      int read_map(struct map *m, unsigned char * data)
580  *      int prepare_write_map(struct peer_req *pr, struct map *map,
581  *                                      struct xseg_request *req)
582  */
583
584 struct map_functions {
585         int (*read_object)(struct map_node *mn, unsigned char *buf);
586         int (*prepare_write_object)(struct peer_req *pr, struct map *map,
587                                 struct map_node *mn, struct xseg_request *req);
588         int (*read_map)(struct map *m, unsigned char * data);
589         int (*prepare_write_map)(struct peer_req *pr, struct map *map,
590                                         struct xseg_request *req);
591 };
592
593 /* version 0 functions */
594
595 /* no header */
596 #define v0_mapheader_size 0
597 /* just the unhexlified name */
598 #define v0_objectsize_in_map SHA256_DIGEST_SIZE
599
600 static inline int read_object_v0(struct map_node *mn, unsigned char *buf)
601 {
602         hexlify(buf, mn->object);
603         mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
604         mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
605         mn->flags = MF_OBJECT_EXIST;
606
607         return 0;
608 }
609
610 static void v0_object_to_map(struct map_node *mn, unsigned char *data)
611 {
612         unhexlify(mn->object, data);
613 }
614
615 static int prepare_write_object_v0(struct peer_req *pr, struct map *map,
616                         struct map_node *mn, struct xseg_request *req)
617 {
618         struct peerd *peer = pr->peer;
619         int r = xseg_prep_request(peer->xseg, req, map->volumelen, v0_objectsize_in_map);
620         if (r < 0){
621                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
622                                 "(Map: %s [%llu]",
623                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
624                 return -1;
625         }
626         char *target = xseg_get_target(peer->xseg, req);
627         strncpy(target, map->volume, req->targetlen);
628         req->size = req->datalen;
629         req->offset = v0_mapheader_size + mn->objectidx * v0_objectsize_in_map;
630
631         unsigned char *data = xseg_get_data(pr->peer->xseg, req);
632         v0_object_to_map(mn, data);
633         return -1;
634 }
635
636 static int read_map_v0(struct map *m, unsigned char * data)
637 {
638         int r;
639         struct map_node *map_node;
640         uint64_t i;
641         uint64_t pos = 0;
642         uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
643         XSEGLOG2(&lc, D, "Max nr_objs %llu", max_nr_objs);
644         char nulls[SHA256_DIGEST_SIZE];
645         memset(nulls, 0, SHA256_DIGEST_SIZE);
646         map_node = calloc(max_nr_objs, sizeof(struct map_node));
647         if (!map_node)
648                 return -1;
649         for (i = 0; i < max_nr_objs; i++) {
650                 if (!memcmp(data+pos, nulls, v0_objectsize_in_map))
651                         break;
652                 map_node[i].objectidx = i;
653                 map_node[i].map = m;
654                 map_node[i].waiters = 0;
655                 map_node[i].ref = 1;
656                 map_node[i].cond = st_cond_new(); //FIXME err check;
657                 read_object_v0(&map_node[i], data+pos);
658                 pos += v0_objectsize_in_map;
659                 r = insert_object(m, &map_node[i]); //FIXME error check
660         }
661         XSEGLOG2(&lc, D, "Found %llu objects", i);
662         m->size = i * block_size;
663         return 0;
664 }
665
666 static int prepare_write_map_v0(struct peer_req *pr, struct map *map,
667                                 struct xseg_request *req)
668 {
669         struct peerd *peer = pr->peer;
670         uint64_t i, pos = 0, max_objidx = calc_map_obj(map);
671         struct map_node *mn;
672         int r = xseg_prep_request(peer->xseg, req, map->volumelen,
673                         v0_mapheader_size + max_objidx * v0_objectsize_in_map);
674         if (r < 0){
675                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
676                 return -1;
677         }
678         char *target = xseg_get_target(peer->xseg, req);
679         strncpy(target, map->volume, req->targetlen);
680         char *data = xseg_get_data(peer->xseg, req);
681
682         req->op = X_WRITE;
683         req->size = req->datalen;
684         req->offset = 0;
685
686         for (i = 0; i < max_objidx; i++) {
687                 mn = find_object(map, i);
688                 if (!mn){
689                         XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
690                                         (unsigned long long) i, map->volume);
691                         return -1;
692                 }
693                 v0_object_to_map(mn, (unsigned char *)(data+pos));
694                 pos += v0_objectsize_in_map;
695         }
696         XSEGLOG2(&lc, D, "Prepared %llu objects", i);
697         return 0;
698 }
699
700 /* static struct map_functions map_functions_v0 =       { */
701 /*                      .read_object = read_object_v0, */
702 /*                      .read_map = read_map_v0, */
703 /*                      .prepare_write_object = prepare_write_object_v0, */
704 /*                      .prepare_write_map = prepare_write_map_v0 */
705 /* }; */
706 #define map_functions_v0 {                              \
707                         .read_object = read_object_v0,  \
708                         .read_map = read_map_v0,        \
709                         .prepare_write_object = prepare_write_object_v0,\
710                         .prepare_write_map = prepare_write_map_v0       \
711                         }
712 /* v1 functions */
713
714 /* transparency byte + max object len in disk */
715 #define v1_objectsize_in_map (1 + SHA256_DIGEST_SIZE)
716
717 /* Map header contains:
718  *      map version
719  *      volume size
720  */
721 #define v1_mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
722
723 static inline int read_object_v1(struct map_node *mn, unsigned char *buf)
724 {
725         char c = buf[0];
726         mn->flags = 0;
727         if (c){
728                 mn->flags |= MF_OBJECT_EXIST;
729                 strcpy(mn->object, MAPPER_PREFIX);
730                 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
731                 mn->object[MAX_OBJECT_LEN] = 0;
732                 mn->objectlen = strlen(mn->object);
733         }
734         else {
735                 mn->flags &= ~MF_OBJECT_EXIST;
736                 hexlify(buf+1, mn->object);
737                 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
738                 mn->objectlen = strlen(mn->object);
739         }
740         return 0;
741 }
742
743 static inline void v1_object_to_map(char* buf, struct map_node *mn)
744 {
745         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
746         if (buf[0]){
747                 /* strip common prefix */
748                 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
749         }
750         else {
751                 unhexlify(mn->object, (unsigned char *)(buf+1));
752         }
753 }
754
755 static int prepare_write_object_v1(struct peer_req *pr, struct map *map,
756                                 struct map_node *mn, struct xseg_request *req)
757 {
758         struct peerd *peer = pr->peer;
759         int r = xseg_prep_request(peer->xseg, req, map->volumelen, v1_objectsize_in_map);
760         if (r < 0){
761                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
762                                 "(Map: %s [%llu]",
763                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
764                 return -1;
765         }
766         char *target = xseg_get_target(peer->xseg, req);
767         strncpy(target, map->volume, req->targetlen);
768         req->size = req->datalen;
769         req->offset = v1_mapheader_size + mn->objectidx * v1_objectsize_in_map;
770
771         char *data = xseg_get_data(pr->peer->xseg, req);
772         v1_object_to_map(data, mn);
773         return 0;
774 }
775
776 static int read_map_v1(struct map *m, unsigned char * data)
777 {
778         int r;
779         struct map_node *map_node;
780         uint64_t i;
781         uint64_t pos = 0;
782         uint64_t nr_objs;
783
784         /* read header */
785         m->version = *(uint32_t *) (data + pos);
786         pos += sizeof(uint32_t);
787         m->size = *(uint64_t *) (data + pos);
788         pos += sizeof(uint64_t);
789
790         /* read objects */
791         nr_objs = m->size / block_size;
792         if (m->size % block_size)
793                 nr_objs++;
794         map_node = calloc(nr_objs, sizeof(struct map_node));
795         if (!map_node)
796                 return -1;
797
798         for (i = 0; i < nr_objs; i++) {
799                 map_node[i].map = m;
800                 map_node[i].objectidx = i;
801                 map_node[i].waiters = 0;
802                 map_node[i].ref = 1;
803                 map_node[i].cond = st_cond_new(); //FIXME err check;
804                 read_object_v1(&map_node[i], data+pos);
805                 pos += objectsize_in_map;
806                 r = insert_object(m, &map_node[i]); //FIXME error check
807         }
808         return 0;
809 }
810
811 static int prepare_write_map_v1(struct peer_req *pr, struct map *m,
812                                 struct xseg_request *req)
813 {
814         struct peerd *peer = pr->peer;
815         uint64_t i, pos = 0, max_objidx = calc_map_obj(m);
816         struct map_node *mn;
817
818         int r = xseg_prep_request(peer->xseg, req, m->volumelen,
819                         v1_mapheader_size + max_objidx * v1_objectsize_in_map);
820         if (r < 0){
821                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", m->volume);
822                 return -1;
823         }
824         char *target = xseg_get_target(peer->xseg, req);
825         strncpy(target, m->volume, req->targetlen);
826         char *data = xseg_get_data(peer->xseg, req);
827
828         memcpy(data + pos, &m->version, sizeof(m->version));
829         pos += sizeof(m->version);
830         memcpy(data + pos, &m->size, sizeof(m->size));
831         pos += sizeof(m->size);
832
833         req->op = X_WRITE;
834         req->size = req->datalen;
835         req->offset = 0;
836
837         for (i = 0; i < max_objidx; i++) {
838                 mn = find_object(m, i);
839                 if (!mn){
840                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
841                                         (unsigned long long) i, m->volume);
842                         return -1;
843                 }
844                 v1_object_to_map(data+pos, mn);
845                 pos += v1_objectsize_in_map;
846         }
847         return 0;
848 }
849
850 /* static struct map_functions map_functions_v1 =       { */
851 /*                      .read_object = read_object_v1, */
852 /*                      .read_map = read_map_v1, */
853 /*                      .prepare_write_object = prepare_write_object_v1, */
854 /*                      .prepare_write_map = prepare_write_map_v1 */
855 /* }; */
856 #define map_functions_v1 {                              \
857                         .read_object = read_object_v1,  \
858                         .read_map = read_map_v1,        \
859                         .prepare_write_object = prepare_write_object_v1,\
860                         .prepare_write_map = prepare_write_map_v1       \
861                         }
862
863 static struct map_functions map_functions[] = { map_functions_v0,
864                                                 map_functions_v1 };
865 #define MAP_LATEST_VERSION 1
866 /* end of functions */
867
868
869
870
871
872 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
873 {
874         hexlify(buf, mn->object);
875         mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
876         mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
877         mn->flags = MF_OBJECT_EXIST;
878 }
879
880 static inline void map_to_object(struct map_node *mn, unsigned char *buf)
881 {
882         char c = buf[0];
883         mn->flags = 0;
884         if (c){
885                 mn->flags |= MF_OBJECT_EXIST;
886                 strcpy(mn->object, MAPPER_PREFIX);
887                 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
888                 mn->object[MAX_OBJECT_LEN] = 0;
889                 mn->objectlen = strlen(mn->object);
890         }
891         else {
892                 hexlify(buf+1, mn->object);
893                 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
894                 mn->objectlen = strlen(mn->object);
895         }
896
897 }
898
899 static inline void object_to_map(char* buf, struct map_node *mn)
900 {
901         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
902         if (buf[0]){
903                 /* strip common prefix */
904                 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
905         }
906         else {
907                 unhexlify(mn->object, (unsigned char *)(buf+1));
908         }
909 }
910
911 static inline void mapheader_to_map(struct map *m, char *buf)
912 {
913         uint64_t pos = 0;
914         memcpy(buf + pos, &m->version, sizeof(m->version));
915         pos += sizeof(m->version);
916         memcpy(buf + pos, &m->size, sizeof(m->size));
917         pos += sizeof(m->size);
918 }
919
920
921 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
922                                 struct map *map, struct map_node *mn)
923 {
924         int r;
925         void *dummy;
926         struct mapperd *mapper = __get_mapperd(peer);
927         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
928                                                         mapper->mbportno, X_ALLOC);
929         if (!req){
930                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
931                                 "(Map: %s [%llu]",
932                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
933                 goto out_err;
934         }
935
936         r = map_functions[map->version].prepare_write_object(pr, map, mn, req);
937         if (r < 0){
938                 XSEGLOG2(&lc, E, "Cannot prepare write object");
939                 goto out_put;
940         }
941         req->op = X_WRITE;
942
943         r = xseg_set_req_data(peer->xseg, req, pr);
944         if (r < 0){
945                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
946                                 "(Map: %s [%llu]",
947                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
948                 goto out_put;
949         }
950         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
951         if (p == NoPort){
952                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
953                                 "(Map: %s [%llu]",
954                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
955                 goto out_unset;
956         }
957         r = xseg_signal(peer->xseg, p);
958         if (r < 0)
959                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
960
961         XSEGLOG2(&lc, I, "Writing object %s \n\t"
962                         "Map: %s [%llu]",
963                         mn->object, map->volume, (unsigned long long) mn->objectidx);
964
965         return req;
966
967 out_unset:
968         xseg_get_req_data(peer->xseg, req, &dummy);
969 out_put:
970         xseg_put_request(peer->xseg, req, pr->portno);
971 out_err:
972         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
973                         "(Map: %s [%llu]",
974                         mn->object, map->volume, (unsigned long long) mn->objectidx);
975         return NULL;
976 }
977
978 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
979 {
980         int r;
981         void *dummy;
982         struct peerd *peer = pr->peer;
983         struct mapperd *mapper = __get_mapperd(peer);
984         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
985                                                         mapper->mbportno, X_ALLOC);
986         if (!req){
987                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
988                 goto out_err;
989         }
990
991         r = map_functions[map->version].prepare_write_map(pr, map, req);
992         if (r < 0){
993                 XSEGLOG2(&lc, E, "Cannot prepare write map");
994                 goto out_put;
995         }
996
997         req->op = X_WRITE;
998
999         r = xseg_set_req_data(peer->xseg, req, pr);
1000         if (r < 0){
1001                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1002                                 map->volume);
1003                 goto out_put;
1004         }
1005         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1006         if (p == NoPort){
1007                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1008                                 map->volume);
1009                 goto out_unset;
1010         }
1011         r = xseg_signal(peer->xseg, p);
1012         if (r < 0)
1013                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
1014
1015         map->flags |= MF_MAP_WRITING;
1016         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
1017         return req;
1018
1019 out_unset:
1020         xseg_get_req_data(peer->xseg, req, &dummy);
1021 out_put:
1022         xseg_put_request(peer->xseg, req, pr->portno);
1023 out_err:
1024         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
1025         return NULL;
1026 }
1027
1028 static int write_map(struct peer_req* pr, struct map *map)
1029 {
1030         int r = 0;
1031         struct peerd *peer = pr->peer;
1032         struct xseg_request *req = __write_map(pr, map);
1033         if (!req)
1034                 return -1;
1035         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1036         if (req->state & XS_FAILED)
1037                 r = -1;
1038         xseg_put_request(peer->xseg, req, pr->portno);
1039         map->flags &= ~MF_MAP_WRITING;
1040         return r;
1041 }
1042
1043 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
1044 {
1045         int r;
1046         xport p;
1047         struct xseg_request *req;
1048         struct peerd *peer = pr->peer;
1049         struct mapperd *mapper = __get_mapperd(peer);
1050         void *dummy;
1051
1052         XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
1053
1054         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1055         if (!req){
1056                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1057                                 m->volume);
1058                 goto out_fail;
1059         }
1060
1061         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1062         if (r < 0){
1063                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1064                                 m->volume);
1065                 goto out_put;
1066         }
1067
1068         char *reqtarget = xseg_get_target(peer->xseg, req);
1069         if (!reqtarget)
1070                 goto out_put;
1071         strncpy(reqtarget, m->volume, req->targetlen);
1072         req->op = X_READ;
1073         req->size = block_size;
1074         req->offset = 0;
1075         r = xseg_set_req_data(peer->xseg, req, pr);
1076         if (r < 0){
1077                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1078                                 m->volume);
1079                 goto out_put;
1080         }
1081         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1082         if (p == NoPort){
1083                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1084                                 m->volume);
1085                 goto out_unset;
1086         }
1087         r = xseg_signal(peer->xseg, p);
1088
1089         m->flags |= MF_MAP_LOADING;
1090         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
1091         return req;
1092
1093 out_unset:
1094         xseg_get_req_data(peer->xseg, req, &dummy);
1095 out_put:
1096         xseg_put_request(peer->xseg, req, pr->portno);
1097 out_fail:
1098         return NULL;
1099 }
1100
1101 static int read_map (struct map *map, unsigned char *buf)
1102 {
1103         char nulls[SHA256_DIGEST_SIZE];
1104         memset(nulls, 0, SHA256_DIGEST_SIZE);
1105
1106         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
1107         if (r) {
1108                 XSEGLOG2(&lc, E, "Read zeros");
1109                 return -1;
1110         }
1111         //type 1, archip type, type 0 pithos map
1112         int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1113         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
1114         uint32_t version;
1115         if (type)
1116                 version = *(uint32_t *) (buf); //version should always be the first uint32_t
1117         else
1118                 version = 0;
1119         if (version > MAP_LATEST_VERSION){
1120                 XSEGLOG2(&lc, E, "Map read for map %s failed. Invalid version %u",
1121                                 map->volume, version);
1122                 return -1;
1123         }
1124
1125         r = map_functions[version].read_map(map, buf);
1126         if (r < 0){
1127                 XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
1128                 return -1;
1129         }
1130
1131         print_map(map);
1132         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
1133         return 0;
1134
1135 }
1136
1137 static int load_map(struct peer_req *pr, struct map *map)
1138 {
1139         int r = 0;
1140         struct xseg_request *req;
1141         struct peerd *peer = pr->peer;
1142         req = __load_map(pr, map);
1143         if (!req)
1144                 return -1;
1145         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1146         map->flags &= ~MF_MAP_LOADING;
1147         if (req->state & XS_FAILED){
1148                 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
1149                 xseg_put_request(peer->xseg, req, pr->portno);
1150                 return -1;
1151         }
1152         r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
1153         xseg_put_request(peer->xseg, req, pr->portno);
1154         return r;
1155 }
1156
1157 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
1158                                                 uint32_t flags)
1159 {
1160         int r;
1161         xport p;
1162         struct xseg_request *req;
1163         struct peerd *peer = pr->peer;
1164         struct mapperd *mapper = __get_mapperd(peer);
1165         void *dummy;
1166
1167         XSEGLOG2(&lc, I, "Opening map %s", m->volume);
1168
1169         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1170         if (!req){
1171                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1172                                 m->volume);
1173                 goto out_fail;
1174         }
1175
1176         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1177         if (r < 0){
1178                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1179                                 m->volume);
1180                 goto out_put;
1181         }
1182
1183         char *reqtarget = xseg_get_target(peer->xseg, req);
1184         if (!reqtarget)
1185                 goto out_put;
1186         strncpy(reqtarget, m->volume, req->targetlen);
1187         req->op = X_ACQUIRE;
1188         req->size = block_size;
1189         req->offset = 0;
1190         if (!(flags & MF_FORCE))
1191                 req->flags = XF_NOSYNC;
1192         r = xseg_set_req_data(peer->xseg, req, pr);
1193         if (r < 0){
1194                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1195                                 m->volume);
1196                 goto out_put;
1197         }
1198         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1199         if (p == NoPort){ 
1200                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1201                                 m->volume);
1202                 goto out_unset;
1203         }
1204         r = xseg_signal(peer->xseg, p);
1205
1206         m->flags |= MF_MAP_OPENING;
1207         XSEGLOG2(&lc, I, "Map %s opening", m->volume);
1208         return req;
1209
1210 out_unset:
1211         xseg_get_req_data(peer->xseg, req, &dummy);
1212 out_put:
1213         xseg_put_request(peer->xseg, req, pr->portno);
1214 out_fail:
1215         return NULL;
1216 }
1217
1218 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
1219 {
1220         int err;
1221         struct xseg_request *req;
1222         struct peerd *peer = pr->peer;
1223
1224         req = __open_map(pr, map, flags);
1225         if (!req){
1226                 return -1;
1227         }
1228         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1229         map->flags &= ~MF_MAP_OPENING;
1230         err = req->state & XS_FAILED;
1231         xseg_put_request(peer->xseg, req, pr->portno);
1232         if (err)
1233                 return -1;
1234         else
1235                 map->flags |= MF_MAP_EXCLUSIVE;
1236         return 0;
1237 }
1238
1239 /*
1240  * copy up functions
1241  */
1242
1243 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
1244 {
1245         int r = 0;
1246         if (mn){
1247                 XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
1248                                 req, mn, mio);
1249                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1250                 if (r == -XHASH_ERESIZE) {
1251                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
1252                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1253                         if (!new_hashmap)
1254                                 goto out;
1255                         mio->copyups_nodes = new_hashmap;
1256                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1257                 }
1258                 if (r < 0)
1259                         XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
1260                                         req, mn, mio);
1261         }
1262         else {
1263                 XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
1264                                 req, mio);
1265                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1266                 if (r == -XHASH_ERESIZE) {
1267                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
1268                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1269                         if (!new_hashmap)
1270                                 goto out;
1271                         mio->copyups_nodes = new_hashmap;
1272                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1273                 }
1274                 if (r < 0)
1275                         XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
1276                                         req, mio);
1277         }
1278 out:
1279         return r;
1280 }
1281
1282 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
1283 {
1284         struct map_node *mn;
1285         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
1286         if (r < 0){
1287                 XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
1288                 return NULL;
1289         }
1290         XSEGLOG2(&lc, D, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
1291         return mn;
1292 }
1293
1294 static struct xseg_request * __snapshot_object(struct peer_req *pr,
1295                                                 struct map_node *mn)
1296 {
1297         struct peerd *peer = pr->peer;
1298         struct mapperd *mapper = __get_mapperd(peer);
1299         struct mapper_io *mio = __get_mapper_io(pr);
1300         //struct map *map = mn->map;
1301         void *dummy;
1302         int r = -1;
1303         xport p;
1304
1305         //assert mn->volume != zero_block
1306         //assert mn->flags & MF_OBJECT_EXIST
1307         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1308                                                 mapper->bportno, X_ALLOC);
1309         if (!req){
1310                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1311                 goto out_err;
1312         }
1313         r = xseg_prep_request(peer->xseg, req, mn->objectlen,
1314                                 sizeof(struct xseg_request_snapshot));
1315         if (r < 0){
1316                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1317                 goto out_put;
1318         }
1319
1320         char *target = xseg_get_target(peer->xseg, req);
1321         strncpy(target, mn->object, req->targetlen);
1322
1323         struct xseg_request_snapshot *xsnapshot = (struct xseg_request_snapshot *) xseg_get_data(peer->xseg, req);
1324         xsnapshot->target[0] = 0;
1325         xsnapshot->targetlen = 0;
1326
1327         req->offset = 0;
1328         req->size = block_size;
1329         req->op = X_SNAPSHOT;
1330         r = xseg_set_req_data(peer->xseg, req, pr);
1331         if (r<0){
1332                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1333                 goto out_put;
1334         }
1335         r = __set_copyup_node(mio, req, mn);
1336         if (r < 0)
1337                 goto out_unset;
1338         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1339         if (p == NoPort) {
1340                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1341                 goto out_mapper_unset;
1342         }
1343         xseg_signal(peer->xseg, p);
1344
1345         mn->flags |= MF_OBJECT_SNAPSHOTTING;
1346         XSEGLOG2(&lc, I, "Snapshotting up object %s", mn->object);
1347         return req;
1348
1349 out_mapper_unset:
1350         __set_copyup_node(mio, req, NULL);
1351 out_unset:
1352         xseg_get_req_data(peer->xseg, req, &dummy);
1353 out_put:
1354         xseg_put_request(peer->xseg, req, pr->portno);
1355 out_err:
1356         XSEGLOG2(&lc, E, "Snapshotting object %s failed", mn->object);
1357         return NULL;
1358 }
1359
1360 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
1361 {
1362         struct mapperd *mapper = __get_mapperd(peer);
1363         struct mapper_io *mio = __get_mapper_io(pr);
1364         struct map *map = mn->map;
1365         void *dummy;
1366         int r = -1;
1367         xport p;
1368
1369         uint32_t newtargetlen;
1370         char new_target[MAX_OBJECT_LEN + 1];
1371         unsigned char sha[SHA256_DIGEST_SIZE];
1372
1373         strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1374
1375         char tmp[XSEG_MAX_TARGETLEN + 1];
1376         uint32_t tmplen;
1377         strncpy(tmp, map->volume, map->volumelen);
1378         sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1379         tmp[XSEG_MAX_TARGETLEN] = 0;
1380         tmplen = strlen(tmp);
1381         SHA256((unsigned char *)tmp, tmplen, sha);
1382         hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1383         newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1384
1385
1386         if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1387                 goto copyup_zeroblock;
1388
1389         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1390                                                 mapper->bportno, X_ALLOC);
1391         if (!req){
1392                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1393                 goto out_err;
1394         }
1395         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
1396                                 sizeof(struct xseg_request_copy));
1397         if (r < 0){
1398                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1399                 goto out_put;
1400         }
1401
1402         char *target = xseg_get_target(peer->xseg, req);
1403         strncpy(target, new_target, req->targetlen);
1404
1405         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1406         strncpy(xcopy->target, mn->object, mn->objectlen);
1407         xcopy->targetlen = mn->objectlen;
1408
1409         req->offset = 0;
1410         req->size = block_size;
1411         req->op = X_COPY;
1412         r = xseg_set_req_data(peer->xseg, req, pr);
1413         if (r<0){
1414                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1415                 goto out_put;
1416         }
1417         r = __set_copyup_node(mio, req, mn);
1418         if (r < 0)
1419                 goto out_unset;
1420         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1421         if (p == NoPort) {
1422                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1423                 goto out_mapper_unset;
1424         }
1425         xseg_signal(peer->xseg, p);
1426 //      mio->copyups++;
1427
1428         mn->flags |= MF_OBJECT_COPYING;
1429         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1430         return req;
1431
1432 out_mapper_unset:
1433         __set_copyup_node(mio, req, NULL);
1434 out_unset:
1435         xseg_get_req_data(peer->xseg, req, &dummy);
1436 out_put:
1437         xseg_put_request(peer->xseg, req, pr->portno);
1438 out_err:
1439         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1440         return NULL;
1441
1442 copyup_zeroblock:
1443         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1444                         "Proceeding in writing the new object in map");
1445         /* construct a tmp map_node for writing purposes */
1446         struct map_node newmn = *mn;
1447         newmn.flags = MF_OBJECT_EXIST;
1448         strncpy(newmn.object, new_target, newtargetlen);
1449         newmn.object[newtargetlen] = 0;
1450         newmn.objectlen = newtargetlen;
1451         newmn.objectidx = mn->objectidx; 
1452         req = object_write(peer, pr, map, &newmn);
1453         r = __set_copyup_node(mio, req, mn);
1454         if (r < 0)
1455                 return NULL;
1456         if (!req){
1457                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1458                                 "\n\t of map %s [%llu]",
1459                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
1460                 return NULL;
1461         }
1462         mn->flags |= MF_OBJECT_WRITING;
1463         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1464         return req;
1465 }
1466
1467 static struct xseg_request * __delete_object(struct peer_req *pr, struct map_node *mn)
1468 {
1469         void *dummy;
1470         struct peerd *peer = pr->peer;
1471         struct mapperd *mapper = __get_mapperd(peer);
1472         struct mapper_io *mio = __get_mapper_io(pr);
1473         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1474                                                         mapper->bportno, X_ALLOC);
1475         XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1476         if (!req){
1477                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1478                 goto out_err;
1479         }
1480         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1481         if (r < 0){
1482                 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1483                 goto out_put;
1484         }
1485         char *target = xseg_get_target(peer->xseg, req);
1486         strncpy(target, mn->object, req->targetlen);
1487         req->op = X_DELETE;
1488         req->size = req->datalen;
1489         req->offset = 0;
1490         r = xseg_set_req_data(peer->xseg, req, pr);
1491         if (r < 0){
1492                 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1493                 goto out_put;
1494         }
1495         r = __set_copyup_node(mio, req, mn);
1496         if (r < 0)
1497                 goto out_unset;
1498         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1499         if (p == NoPort){
1500                 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1501                 goto out_mapper_unset;
1502         }
1503         r = xseg_signal(peer->xseg, p);
1504         mn->flags |= MF_OBJECT_DELETING;
1505         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1506         return req;
1507
1508 out_mapper_unset:
1509         __set_copyup_node(mio, req, NULL);
1510 out_unset:
1511         xseg_get_req_data(peer->xseg, req, &dummy);
1512 out_put:
1513         xseg_put_request(peer->xseg, req, pr->portno);
1514 out_err:
1515         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1516         return NULL;
1517 }
1518
1519 static struct xseg_request * __delete_map(struct peer_req *pr, struct map *map)
1520 {
1521         void *dummy;
1522         struct peerd *peer = pr->peer;
1523         struct mapperd *mapper = __get_mapperd(peer);
1524         struct mapper_io *mio = __get_mapper_io(pr);
1525         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1526                                                         mapper->mbportno, X_ALLOC);
1527         XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1528         if (!req){
1529                 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1530                 goto out_err;
1531         }
1532         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1533         if (r < 0){
1534                 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1535                 goto out_put;
1536         }
1537         char *target = xseg_get_target(peer->xseg, req);
1538         strncpy(target, map->volume, req->targetlen);
1539         req->op = X_DELETE;
1540         req->size = req->datalen;
1541         req->offset = 0;
1542         r = xseg_set_req_data(peer->xseg, req, pr);
1543         if (r < 0){
1544                 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1545                 goto out_put;
1546         }
1547         /* do not check return value. just make sure there is no node set */
1548         __set_copyup_node(mio, req, NULL);
1549         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1550         if (p == NoPort){
1551                 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1552                 goto out_unset;
1553         }
1554         r = xseg_signal(peer->xseg, p);
1555         map->flags |= MF_MAP_DELETING;
1556         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1557         return req;
1558
1559 out_unset:
1560         xseg_get_req_data(peer->xseg, req, &dummy);
1561 out_put:
1562         xseg_put_request(peer->xseg, req, pr->portno);
1563 out_err:
1564         XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1565         return  NULL;
1566 }
1567
1568
1569 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1570 {
1571         struct map_node *mn = find_object(map, index);
1572         if (mn)
1573                 mn->ref++;
1574         return mn;
1575 }
1576
1577 static inline void put_mapnode(struct map_node *mn)
1578 {
1579         mn->ref--;
1580         if (!mn->ref){
1581                 //clean up mn
1582                 st_cond_destroy(mn->cond);
1583         }
1584 }
1585
1586 static inline void __get_map(struct map *map)
1587 {
1588         map->ref++;
1589 }
1590
1591 static inline void put_map(struct map *map)
1592 {
1593         struct map_node *mn;
1594         map->ref--;
1595         if (!map->ref){
1596                 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1597                 //clean up map
1598                 uint64_t i;
1599                 for (i = 0; i < calc_map_obj(map); i++) {
1600                         mn = get_mapnode(map, i);
1601                         if (mn) {
1602                                 //make sure all pending operations on all objects are completed
1603                                 //this should never happen...
1604                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1605                                 mn->flags |= MF_OBJECT_DESTROYED;
1606                                 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1607                                 put_mapnode(mn); //matcing get_mapnode;
1608                                 //assert mn->ref == 0;
1609                         }
1610                 }
1611                 mn = find_object(map, 0);
1612                 if (mn)
1613                         free(mn);
1614                 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1615                 free(map);
1616         }
1617 }
1618
1619 static struct map * create_map(struct mapperd *mapper, char *name,
1620                                 uint32_t namelen, uint32_t flags)
1621 {
1622         int r;
1623         if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1624                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1625                                         namelen, MAX_VOLUME_LEN);
1626                 return NULL;
1627         }
1628         struct map *m = malloc(sizeof(struct map));
1629         if (!m){
1630                 XSEGLOG2(&lc, E, "Cannot allocate map ");
1631                 goto out_err;
1632         }
1633         m->size = -1;
1634         if (flags & MF_ARCHIP){
1635                 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1636                 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1637                 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1638                 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1639                 m->version = 1; /* keep this hardcoded for now */
1640         }
1641         else {
1642                 strncpy(m->volume, name, namelen);
1643                 m->volume[namelen] = 0;
1644                 m->volumelen = namelen;
1645                 m->version = 0; /* version 0 should be pithos maps */
1646         }
1647         m->flags = 0;
1648         m->objects = xhash_new(3, INTEGER); 
1649         if (!m->objects){
1650                 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1651                                 m->volume);
1652                 goto out_map;
1653         }
1654         m->ref = 1;
1655         m->waiters = 0;
1656         m->cond = st_cond_new(); //FIXME err check;
1657         r = insert_map(mapper, m);
1658         if (r < 0){
1659                 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1660                 goto out_hash;
1661         }
1662
1663         return m;
1664
1665 out_hash:
1666         xhash_free(m->objects);
1667 out_map:
1668         XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1669         free(m);
1670 out_err:
1671         return NULL;
1672 }
1673
1674
1675
1676 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1677 {
1678         struct peerd *peer = pr->peer;
1679         struct mapperd *mapper = __get_mapperd(peer);
1680         (void)mapper;
1681         struct mapper_io *mio = __get_mapper_io(pr);
1682         struct map_node *mn = __get_copyup_node(mio, req);
1683
1684         __set_copyup_node(mio, req, NULL);
1685
1686         //assert req->op = X_DELETE;
1687         //assert pr->req->op = X_DELETE only map deletions make delete requests
1688         //assert mio->del_pending > 0
1689         XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
1690         mio->del_pending--;
1691
1692         if (req->state & XS_FAILED){
1693                 mio->err = 1;
1694         }
1695         if (mn){
1696                 XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
1697                                 mn, mn->object, mio, req);
1698                 // assert mn->flags & MF_OBJECT_DELETING
1699                 mn->flags &= ~MF_OBJECT_DELETING;
1700                 mn->flags |= MF_OBJECT_DESTROYED;
1701                 signal_mapnode(mn);
1702                 /* put mapnode here, matches get_mapnode on do_destroy */
1703                 put_mapnode(mn);
1704         } else {
1705                 XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
1706                                 mio, req);
1707         }
1708         xseg_put_request(peer->xseg, req, pr->portno);
1709         signal_pr(pr);
1710 }
1711
1712 void snapshot_cb(struct peer_req *pr, struct xseg_request *req)
1713 {
1714         struct peerd *peer = pr->peer;
1715         struct mapperd *mapper = __get_mapperd(peer);
1716         (void)mapper;
1717         struct mapper_io *mio = __get_mapper_io(pr);
1718         struct map_node *mn = __get_copyup_node(mio, req);
1719         if (!mn){
1720                 XSEGLOG2(&lc, E, "Cannot get map node");
1721                 goto out_err;
1722         }
1723         __set_copyup_node(mio, req, NULL);
1724
1725         if (req->state & XS_FAILED){
1726                 if (req->op == X_DELETE){
1727                         XSEGLOG2(&lc, E, "Delete req failed");
1728                         goto out_ok;
1729                 }
1730                 XSEGLOG2(&lc, E, "Req failed");
1731                 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1732                 mn->flags &= ~MF_OBJECT_WRITING;
1733                 goto out_err;
1734         }
1735
1736         if (req->op == X_WRITE) {
1737                 char old_object_name[MAX_OBJECT_LEN + 1];
1738                 uint32_t old_objectlen;
1739
1740                 char *target = xseg_get_target(peer->xseg, req);
1741                 (void)target;
1742                 //assert mn->flags & MF_OBJECT_WRITING
1743                 mn->flags &= ~MF_OBJECT_WRITING;
1744                 strncpy(old_object_name, mn->object, mn->objectlen);
1745                 old_objectlen = mn->objectlen;
1746
1747                 struct map_node tmp;
1748                 char *data = xseg_get_data(peer->xseg, req);
1749                 map_to_object(&tmp, (unsigned char *) data);
1750                 mn->flags &= ~MF_OBJECT_EXIST;
1751
1752                 strncpy(mn->object, tmp.object, tmp.objectlen);
1753                 mn->object[tmp.objectlen] = 0;
1754                 mn->objectlen = tmp.objectlen;
1755                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1756                 //signal_mapnode since Snapshot was successfull
1757                 signal_mapnode(mn);
1758
1759                 //do delete old object
1760                 strncpy(tmp.object, old_object_name, old_objectlen);
1761                 tmp.object[old_objectlen] = 0;
1762                 tmp.objectlen = old_objectlen;
1763                 tmp.flags = MF_OBJECT_EXIST;
1764                 struct xseg_request *xreq = __delete_object(pr, &tmp);
1765                 if (!xreq){
1766                         //just a warning. Snapshot was successfull
1767                         XSEGLOG2(&lc, W, "Cannot delete old object %s", tmp.object);
1768                         goto out_ok;
1769                 }
1770                 //overwrite copyup node, since tmp is a stack dummy variable
1771                 __set_copyup_node (mio, xreq, mn);
1772                 XSEGLOG2(&lc, I, "Deletion of %s pending", tmp.object);
1773         } else if (req->op == X_SNAPSHOT) {
1774                 //issue write_object;
1775                 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1776                 struct map *map = mn->map;
1777                 if (!map){
1778                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1779                         goto out_err;
1780                 }
1781
1782                 /* construct a tmp map_node for writing purposes */
1783                 //char *target = xseg_get_target(peer->xseg, req);
1784                 struct map_node newmn = *mn;
1785                 newmn.flags = 0;
1786                 struct xseg_reply_snapshot *xreply;
1787                 xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, req);
1788                 //assert xreply->targetlen !=0
1789                 //assert xreply->targetlen < XSEG_MAX_TARGETLEN
1790                 //xreply->target[xreply->targetlen] = 0;
1791                 //assert xreply->target valid
1792                 strncpy(newmn.object, xreply->target, xreply->targetlen);
1793                 newmn.object[req->targetlen] = 0;
1794                 newmn.objectlen = req->targetlen;
1795                 newmn.objectidx = mn->objectidx;
1796                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1797                 if (!xreq){
1798                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1799                                         "\n\t of map %s [%llu]",
1800                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1801                         goto out_err;
1802                 }
1803                 mn->flags |= MF_OBJECT_WRITING;
1804                 __set_copyup_node (mio, xreq, mn);
1805
1806                 XSEGLOG2(&lc, I, "Object %s snapshot completed. Pending writing.", mn->object);
1807         } else if (req->op == X_DELETE){
1808                 //deletion of the old block completed
1809                 XSEGLOG2(&lc, I, "Deletion of completed");
1810                 goto out_ok;
1811                 ;
1812         } else {
1813                 //wtf??
1814                 ;
1815         }
1816
1817 out:
1818         xseg_put_request(peer->xseg, req, pr->portno);
1819         return;
1820
1821 out_err:
1822         mio->snap_pending--;
1823         XSEGLOG2(&lc, D, "Mio->snap_pending: %u", mio->snap_pending);
1824         mio->err = 1;
1825         if (mn)
1826                 signal_mapnode(mn);
1827         signal_pr(pr);
1828         goto out;
1829
1830 out_ok:
1831         mio->snap_pending--;
1832         signal_pr(pr);
1833         goto out;
1834
1835
1836 }
1837 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1838 {
1839         struct peerd *peer = pr->peer;
1840         struct mapperd *mapper = __get_mapperd(peer);
1841         (void)mapper;
1842         struct mapper_io *mio = __get_mapper_io(pr);
1843         struct map_node *mn = __get_copyup_node(mio, req);
1844         if (!mn){
1845                 XSEGLOG2(&lc, E, "Cannot get map node");
1846                 goto out_err;
1847         }
1848         __set_copyup_node(mio, req, NULL);
1849
1850         if (req->state & XS_FAILED){
1851                 XSEGLOG2(&lc, E, "Req failed");
1852                 mn->flags &= ~MF_OBJECT_COPYING;
1853                 mn->flags &= ~MF_OBJECT_WRITING;
1854                 goto out_err;
1855         }
1856         if (req->op == X_WRITE) {
1857                 char *target = xseg_get_target(peer->xseg, req);
1858                 (void)target;
1859                 //printf("handle object write replyi\n");
1860                 __set_copyup_node(mio, req, NULL);
1861                 //assert mn->flags & MF_OBJECT_WRITING
1862                 mn->flags &= ~MF_OBJECT_WRITING;
1863
1864                 struct map_node tmp;
1865                 char *data = xseg_get_data(peer->xseg, req);
1866                 map_to_object(&tmp, (unsigned char *) data);
1867                 mn->flags |= MF_OBJECT_EXIST;
1868                 if (mn->flags != MF_OBJECT_EXIST){
1869                         XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1870                         goto out_err;
1871                 }
1872                 //assert mn->flags & MF_OBJECT_EXIST
1873                 strncpy(mn->object, tmp.object, tmp.objectlen);
1874                 mn->object[tmp.objectlen] = 0;
1875                 mn->objectlen = tmp.objectlen;
1876                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1877                 mio->copyups--;
1878                 signal_mapnode(mn);
1879                 signal_pr(pr);
1880         } else if (req->op == X_COPY) {
1881         //      issue write_object;
1882                 mn->flags &= ~MF_OBJECT_COPYING;
1883                 struct map *map = mn->map;
1884                 if (!map){
1885                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1886                         goto out_err;
1887                 }
1888
1889                 /* construct a tmp map_node for writing purposes */
1890                 char *target = xseg_get_target(peer->xseg, req);
1891                 struct map_node newmn = *mn;
1892                 newmn.flags = MF_OBJECT_EXIST;
1893                 strncpy(newmn.object, target, req->targetlen);
1894                 newmn.object[req->targetlen] = 0;
1895                 newmn.objectlen = req->targetlen;
1896                 newmn.objectidx = mn->objectidx; 
1897                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1898                 if (!xreq){
1899                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1900                                         "\n\t of map %s [%llu]",
1901                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1902                         goto out_err;
1903                 }
1904                 mn->flags |= MF_OBJECT_WRITING;
1905                 __set_copyup_node (mio, xreq, mn);
1906
1907                 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1908         } else {
1909                 //wtf??
1910                 ;
1911         }
1912
1913 out:
1914         xseg_put_request(peer->xseg, req, pr->portno);
1915         return;
1916
1917 out_err:
1918         mio->copyups--;
1919         XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1920         mio->err = 1;
1921         if (mn)
1922                 signal_mapnode(mn);
1923         signal_pr(pr);
1924         goto out;
1925
1926 }
1927
1928 struct r2o {
1929         struct map_node *mn;
1930         uint64_t offset;
1931         uint64_t size;
1932 };
1933
1934 static int req2objs(struct peer_req *pr, struct map *map, int write)
1935 {
1936         int r = 0;
1937         struct peerd *peer = pr->peer;
1938         struct mapper_io *mio = __get_mapper_io(pr);
1939         char *target = xseg_get_target(peer->xseg, pr->req);
1940         uint32_t nr_objs = calc_nr_obj(pr->req);
1941         uint64_t size = sizeof(struct xseg_reply_map) + 
1942                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1943         uint32_t idx, i;
1944         uint64_t rem_size, obj_index, obj_offset, obj_size; 
1945         struct map_node *mn;
1946         mio->copyups = 0;
1947         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1948
1949         /* get map_nodes of request */
1950         struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1951         if (!mns){
1952                 XSEGLOG2(&lc, E, "Cannot allocate mns");
1953                 return -1;
1954         }
1955         idx = 0;
1956         rem_size = pr->req->size;
1957         obj_index = pr->req->offset / block_size;
1958         obj_offset = pr->req->offset & (block_size -1); //modulo
1959         obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1960         mn = get_mapnode(map, obj_index);
1961         if (!mn) {
1962                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1963                 r = -1;
1964                 goto out;
1965         }
1966         mns[idx].mn = mn;
1967         mns[idx].offset = obj_offset;
1968         mns[idx].size = obj_size;
1969         rem_size -= obj_size;
1970         while (rem_size > 0) {
1971                 idx++;
1972                 obj_index++;
1973                 obj_offset = 0;
1974                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
1975                 rem_size -= obj_size;
1976                 mn = get_mapnode(map, obj_index);
1977                 if (!mn) {
1978                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1979                         r = -1;
1980                         goto out;
1981                 }
1982                 mns[idx].mn = mn;
1983                 mns[idx].offset = obj_offset;
1984                 mns[idx].size = obj_size;
1985         }
1986         if (write) {
1987                 int can_wait = 0;
1988                 mio->cb=copyup_cb;
1989                 /* do a first scan and issue as many copyups as we can.
1990                  * then retry and wait when an object is not ready.
1991                  * this could be done better, since now we wait also on the
1992                  * pending copyups
1993                  */
1994                 int j;
1995                 for (j = 0; j < 2 && !mio->err; j++) {
1996                         for (i = 0; i < (idx+1); i++) {
1997                                 mn = mns[i].mn;
1998                                 //do copyups
1999                                 if (mn->flags & MF_OBJECT_NOT_READY){
2000                                         if (!can_wait)
2001                                                 continue;
2002                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2003                                         if (mn->flags & MF_OBJECT_DESTROYED){
2004                                                 mio->err = 1;
2005                                                 continue;
2006                                         }
2007                                 }
2008
2009                                 if (!(mn->flags & MF_OBJECT_EXIST)) {
2010                                         //calc new_target, copy up object
2011                                         if (copyup_object(peer, mn, pr) == NULL){
2012                                                 XSEGLOG2(&lc, E, "Error in copy up object");
2013                                                 mio->err = 1;
2014                                         } else {
2015                                                 mio->copyups++;
2016                                         }
2017                                 }
2018
2019                                 if (mio->err){
2020                                         XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
2021                                         break;
2022                                 }
2023                         }
2024                         can_wait = 1;
2025                 }
2026                 wait_on_pr(pr, mio->copyups > 0);
2027         }
2028
2029         if (mio->err){
2030                 r = -1;
2031                 XSEGLOG2(&lc, E, "Mio->err");
2032                 goto out;
2033         }
2034
2035         /* resize request to fit reply */
2036         char buf[XSEG_MAX_TARGETLEN];
2037         strncpy(buf, target, pr->req->targetlen);
2038         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
2039         if (r < 0) {
2040                 XSEGLOG2(&lc, E, "Cannot resize request");
2041                 goto out;
2042         }
2043         target = xseg_get_target(peer->xseg, pr->req);
2044         strncpy(target, buf, pr->req->targetlen);
2045
2046         /* structure reply */
2047         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2048         reply->cnt = nr_objs;
2049         for (i = 0; i < (idx+1); i++) {
2050                 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
2051                 reply->segs[i].targetlen = mns[i].mn->objectlen;
2052                 reply->segs[i].offset = mns[i].offset;
2053                 reply->segs[i].size = mns[i].size;
2054         }
2055 out:
2056         for (i = 0; i < idx; i++) {
2057                 put_mapnode(mns[i].mn);
2058         }
2059         free(mns);
2060         mio->cb = NULL;
2061         return r;
2062 }
2063
2064 static int do_dropcache(struct peer_req *pr, struct map *map)
2065 {
2066         struct map_node *mn;
2067         struct peerd *peer = pr->peer;
2068         struct mapperd *mapper = __get_mapperd(peer);
2069         uint64_t i;
2070         XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
2071         map->flags |= MF_MAP_DROPPING_CACHE;
2072         for (i = 0; i < calc_map_obj(map); i++) {
2073                 mn = get_mapnode(map, i);
2074                 if (mn) {
2075                         if (!(mn->flags & MF_OBJECT_DESTROYED)){
2076                                 //make sure all pending operations on all objects are completed
2077                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2078                                 mn->flags |= MF_OBJECT_DESTROYED;
2079                         }
2080                         put_mapnode(mn);
2081                 }
2082         }
2083         map->flags &= ~MF_MAP_DROPPING_CACHE;
2084         map->flags |= MF_MAP_DESTROYED;
2085         remove_map(mapper, map);
2086         XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
2087         put_map(map);   // put map here to destroy it (matches m->ref = 1 on map create)
2088         return 0;
2089 }
2090
2091 static int do_info(struct peer_req *pr, struct map *map)
2092 {
2093         struct peerd *peer = pr->peer;
2094         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
2095         xinfo->size = map->size;
2096         return 0;
2097 }
2098
2099
2100 static int do_open(struct peer_req *pr, struct map *map)
2101 {
2102         if (map->flags & MF_MAP_EXCLUSIVE){
2103                 return 0;
2104         }
2105         else {
2106                 return -1;
2107         }
2108 }
2109
2110 static int do_close(struct peer_req *pr, struct map *map)
2111 {
2112         if (map->flags & MF_MAP_EXCLUSIVE){
2113                 /* do not drop cache if close failed and map not deleted */
2114                 if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
2115                         return -1;
2116         }
2117         return do_dropcache(pr, map);
2118 }
2119
2120 static int do_snapshot(struct peer_req *pr, struct map *map)
2121 {
2122         uint64_t i;
2123         struct peerd *peer = pr->peer;
2124         struct mapper_io *mio = __get_mapper_io(pr);
2125         struct map_node *mn;
2126         struct xseg_request *req;
2127
2128         if (!(map->flags & MF_MAP_EXCLUSIVE)){
2129                 XSEGLOG2(&lc, E, "Map was not opened exclusively");
2130                 return -1;
2131         }
2132         XSEGLOG2(&lc, I, "Starting snapshot for map %s", map->volume);
2133         map->flags |= MF_MAP_SNAPSHOTTING;
2134
2135         uint64_t nr_obj = calc_map_obj(map);
2136         mio->cb = snapshot_cb;
2137         mio->snap_pending = 0;
2138         mio->err = 0;
2139         for (i = 0; i < nr_obj; i++){
2140
2141                 /* throttle pending snapshots
2142                  * this should be nr_ops of the blocker, but since we don't know
2143                  * that, we assume based on our own nr_ops
2144                  */
2145                 wait_on_pr(pr, mio->snap_pending >= peer->nr_ops);
2146
2147                 mn = get_mapnode(map, i);
2148                 if (!mn)
2149                         //warning?
2150                         continue;
2151                 if (!(mn->flags & MF_OBJECT_EXIST)){
2152                         put_mapnode(mn);
2153                         continue;
2154                 }
2155                 // make sure all pending operations on all objects are completed
2156                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2157
2158                 /* TODO will this ever happen?? */
2159                 if (mn->flags & MF_OBJECT_DESTROYED){
2160                         put_mapnode(mn);
2161                         continue;
2162                 }
2163
2164                 req = __snapshot_object(pr, mn);
2165                 if (!req){
2166                         mio->err = 1;
2167                         put_mapnode(mn);
2168                         break;
2169                 }
2170                 mio->snap_pending++;
2171                 /* do not put_mapnode here. cb does that */
2172         }
2173
2174         wait_on_pr(pr, mio->snap_pending > 0);
2175         mio->cb = NULL;
2176
2177         if (mio->err)
2178                 goto out_err;
2179
2180         /* calculate name of snapshot */
2181         struct map tmp_map = *map;
2182         unsigned char sha[SHA256_DIGEST_SIZE];
2183         unsigned char *buf = malloc(block_size);
2184         char newvolumename[MAX_VOLUME_LEN];
2185         uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
2186         uint64_t pos = 0;
2187         uint64_t max_objidx = calc_map_obj(map);
2188         int r;
2189
2190         for (i = 0; i < max_objidx; i++) {
2191                 mn = find_object(map, i);
2192                 if (!mn){
2193                         XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
2194                                         (unsigned long long) i, map->volume);
2195                         goto out_err;
2196                 }
2197                 v0_object_to_map(mn, buf+pos);
2198                 pos += v0_objectsize_in_map;
2199         }
2200         SHA256(buf, pos, sha);
2201         hexlify(sha, newvolumename);
2202         strncpy(tmp_map.volume, newvolumename, newvolumenamelen);
2203         tmp_map.volumelen = newvolumenamelen;
2204         free(buf);
2205         tmp_map.version = 0; // set volume version to pithos image
2206
2207         /* write the map of the Snapshot */
2208         r = write_map(pr, &tmp_map);
2209         if (r < 0)
2210                 goto out_err;
2211         char targetbuf[XSEG_MAX_TARGETLEN];
2212         char *target = xseg_get_target(peer->xseg, pr->req);
2213         strncpy(targetbuf, target, pr->req->targetlen);
2214         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
2215                         sizeof(struct xseg_reply_snapshot));
2216         if (r < 0){
2217                 XSEGLOG2(&lc, E, "Cannot resize request");
2218                 goto out_err;
2219         }
2220         target = xseg_get_target(peer->xseg, pr->req);
2221         strncpy(target, targetbuf, pr->req->targetlen);
2222
2223         struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *)
2224                                                 xseg_get_data(peer->xseg, pr->req);
2225         strncpy(xreply->target, newvolumename, newvolumenamelen);
2226         xreply->targetlen = newvolumenamelen;
2227         map->flags &= ~MF_MAP_SNAPSHOTTING;
2228         XSEGLOG2(&lc, I, "Snapshot for map %s completed", map->volume);
2229         return 0;
2230
2231 out_err:
2232         map->flags &= ~MF_MAP_SNAPSHOTTING;
2233         XSEGLOG2(&lc, E, "Snapshot for map %s failed", map->volume);
2234         return -1;
2235 }
2236
2237
2238 static int do_destroy(struct peer_req *pr, struct map *map)
2239 {
2240         uint64_t i;
2241         struct peerd *peer = pr->peer;
2242         struct mapper_io *mio = __get_mapper_io(pr);
2243         struct map_node *mn;
2244         struct xseg_request *req;
2245
2246         if (!(map->flags & MF_MAP_EXCLUSIVE))
2247                 return -1;
2248
2249         XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
2250         req = __delete_map(pr, map);
2251         if (!req)
2252                 return -1;
2253         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
2254         if (req->state & XS_FAILED){
2255                 xseg_put_request(peer->xseg, req, pr->portno);
2256                 map->flags &= ~MF_MAP_DELETING;
2257                 return -1;
2258         }
2259         xseg_put_request(peer->xseg, req, pr->portno);
2260
2261         uint64_t nr_obj = calc_map_obj(map);
2262         mio->cb = deletion_cb;
2263         mio->del_pending = 0;
2264         mio->err = 0;
2265         for (i = 0; i < nr_obj; i++){
2266
2267                 /* throttle pending deletions
2268                  * this should be nr_ops of the blocker, but since we don't know
2269                  * that, we assume based on our own nr_ops
2270                  */
2271                 wait_on_pr(pr, mio->del_pending >= peer->nr_ops);
2272
2273                 mn = get_mapnode(map, i);
2274                 if (!mn)
2275                         continue;
2276                 if (mn->flags & MF_OBJECT_DESTROYED){
2277                         put_mapnode(mn);
2278                         continue;
2279                 }
2280                 if (!(mn->flags & MF_OBJECT_EXIST)){
2281                         mn->flags |= MF_OBJECT_DESTROYED;
2282                         put_mapnode(mn);
2283                         continue;
2284                 }
2285
2286                 // make sure all pending operations on all objects are completed
2287                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2288
2289                 req = __delete_object(pr, mn);
2290                 if (!req){
2291                         mio->err = 1;
2292                         put_mapnode(mn);
2293                         continue;
2294                 }
2295                 mio->del_pending++;
2296                 /* do not put_mapnode here. cb does that */
2297         }
2298
2299         wait_on_pr(pr, mio->del_pending > 0);
2300
2301         mio->cb = NULL;
2302         map->flags &= ~MF_MAP_DELETING;
2303         map->flags |= MF_MAP_DELETED;
2304         XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
2305         return do_close(pr, map);
2306 }
2307
2308 static int do_mapr(struct peer_req *pr, struct map *map)
2309 {
2310         struct peerd *peer = pr->peer;
2311         int r = req2objs(pr, map, 0);
2312         if  (r < 0){
2313                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
2314                                 map->volume, 
2315                                 (unsigned long long) pr->req->offset, 
2316                                 (unsigned long long) (pr->req->offset + pr->req->size));
2317                 return -1;
2318         }
2319         XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
2320                         map->volume, 
2321                         (unsigned long long) pr->req->offset, 
2322                         (unsigned long long) (pr->req->offset + pr->req->size));
2323         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2324                         (unsigned long long) pr->req->offset,
2325                         (unsigned long long) pr->req->size);
2326         char buf[XSEG_MAX_TARGETLEN+1];
2327         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2328         int i;
2329         for (i = 0; i < reply->cnt; i++) {
2330                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2331                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2332                 buf[reply->segs[i].targetlen] = 0;
2333                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2334                                 (unsigned long long) reply->segs[i].offset,
2335                                 (unsigned long long) reply->segs[i].size);
2336         }
2337         return 0;
2338 }
2339
2340 static int do_mapw(struct peer_req *pr, struct map *map)
2341 {
2342         struct peerd *peer = pr->peer;
2343         int r = req2objs(pr, map, 1);
2344         if  (r < 0){
2345                 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
2346                                 map->volume, 
2347                                 (unsigned long long) pr->req->offset, 
2348                                 (unsigned long long) (pr->req->offset + pr->req->size));
2349                 return -1;
2350         }
2351         XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
2352                         map->volume, 
2353                         (unsigned long long) pr->req->offset, 
2354                         (unsigned long long) (pr->req->offset + pr->req->size));
2355         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2356                         (unsigned long long) pr->req->offset,
2357                         (unsigned long long) pr->req->size);
2358         char buf[XSEG_MAX_TARGETLEN+1];
2359         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2360         int i;
2361         for (i = 0; i < reply->cnt; i++) {
2362                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2363                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2364                 buf[reply->segs[i].targetlen] = 0;
2365                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2366                                 (unsigned long long) reply->segs[i].offset,
2367                                 (unsigned long long) reply->segs[i].size);
2368         }
2369         return 0;
2370 }
2371
2372 //here map is the parent map
2373 static int do_clone(struct peer_req *pr, struct map *map)
2374 {
2375         int r;
2376         struct peerd *peer = pr->peer;
2377         struct mapperd *mapper = __get_mapperd(peer);
2378         char *target = xseg_get_target(peer->xseg, pr->req);
2379         struct map *clonemap;
2380         struct xseg_request_clone *xclone =
2381                 (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2382
2383         XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
2384
2385         clonemap = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2386         if (!clonemap)
2387                 return -1;
2388
2389         /* open map to get exclusive access to map */
2390         r = open_map(pr, clonemap, 0);
2391         if (r < 0){
2392                 XSEGLOG2(&lc, E, "Cannot open map %s", clonemap->volume);
2393                 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2394                 goto out_err;
2395         }
2396         r = load_map(pr, clonemap);
2397         if (r >= 0) {
2398                 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2399                 goto out_err;
2400         }
2401
2402         if (xclone->size == -1)
2403                 clonemap->size = map->size;
2404         else
2405                 clonemap->size = xclone->size;
2406         if (clonemap->size < map->size){
2407                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
2408                                 "\n\t for requested clone %s",
2409                                 (unsigned long long) xclone->size,
2410                                 (unsigned long long) map->size, clonemap->volume);
2411                 goto out_err;
2412         }
2413         if (clonemap->size > MAX_VOLUME_SIZE) {
2414                 XSEGLOG2(&lc, E, "Requested size %llu > max volume size %llu"
2415                                 "\n\t for volume %s",
2416                                 clonemap->size, MAX_VOLUME_SIZE, clonemap->volume);
2417                 goto out_err;
2418         }
2419
2420         //alloc and init map_nodes
2421         //unsigned long c = clonemap->size/block_size + 1;
2422         unsigned long c = calc_map_obj(clonemap);
2423         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
2424         if (!map_nodes){
2425                 goto out_err;
2426         }
2427         int i;
2428         //for (i = 0; i < clonemap->size/block_size + 1; i++) {
2429         for (i = 0; i < c; i++) {
2430                 struct map_node *mn = get_mapnode(map, i);
2431                 if (mn) {
2432                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
2433                         map_nodes[i].objectlen = mn->objectlen;
2434                         put_mapnode(mn);
2435                 } else {
2436                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2437                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2438                 }
2439                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2440                 map_nodes[i].flags = 0;
2441                 map_nodes[i].objectidx = i;
2442                 map_nodes[i].map = clonemap;
2443                 map_nodes[i].ref = 1;
2444                 map_nodes[i].waiters = 0;
2445                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2446                 r = insert_object(clonemap, &map_nodes[i]);
2447                 if (r < 0){
2448                         XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
2449                         goto out_err;
2450                 }
2451         }
2452
2453         r = write_map(pr, clonemap);
2454         if (r < 0){
2455                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
2456                 goto out_err;
2457         }
2458         do_close(pr, clonemap);
2459         return 0;
2460
2461 out_err:
2462         do_close(pr, clonemap);
2463         return -1;
2464 }
2465
2466 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
2467 {
2468         int r, opened = 0;
2469         if (flags & MF_EXCLUSIVE){
2470                 r = open_map(pr, map, flags);
2471                 if (r < 0) {
2472                         if (flags & MF_FORCE){
2473                                 return -1;
2474                         }
2475                 } else {
2476                         opened = 1;
2477                 }
2478         }
2479         r = load_map(pr, map);
2480         if (r < 0 && opened){
2481                 close_map(pr, map);
2482         }
2483         return r;
2484 }
2485
2486 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
2487                         uint32_t flags)
2488 {
2489         int r;
2490         struct peerd *peer = pr->peer;
2491         struct mapperd *mapper = __get_mapperd(peer);
2492         struct map *map = find_map_len(mapper, name, namelen, flags);
2493         if (!map){
2494                 if (flags & MF_LOAD){
2495                         map = create_map(mapper, name, namelen, flags);
2496                         if (!map)
2497                                 return NULL;
2498                         r = open_load_map(pr, map, flags);
2499                         if (r < 0){
2500                                 do_dropcache(pr, map);
2501                                 return NULL;
2502                         }
2503                 } else {
2504                         return NULL;
2505                 }
2506         } else if (map->flags & MF_MAP_DESTROYED){
2507                 return NULL;
2508         }
2509         __get_map(map);
2510         return map;
2511
2512 }
2513
2514 static int map_action(int (action)(struct peer_req *pr, struct map *map),
2515                 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
2516 {
2517         //struct peerd *peer = pr->peer;
2518         struct map *map;
2519 start:
2520         map = get_map(pr, name, namelen, flags);
2521         if (!map)
2522                 return -1;
2523         if (map->flags & MF_MAP_NOT_READY){
2524                 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
2525                 put_map(map);
2526                 goto start;
2527         }
2528         int r = action(pr, map);
2529         //always drop cache if map not read exclusively
2530         if (!(map->flags & MF_MAP_EXCLUSIVE))
2531                 do_dropcache(pr, map);
2532         signal_map(map);
2533         put_map(map);
2534         return r;
2535 }
2536
2537 void * handle_info(struct peer_req *pr)
2538 {
2539         struct peerd *peer = pr->peer;
2540         char *target = xseg_get_target(peer->xseg, pr->req);
2541         int r = map_action(do_info, pr, target, pr->req->targetlen,
2542                                 MF_ARCHIP|MF_LOAD);
2543         if (r < 0)
2544                 fail(peer, pr);
2545         else
2546                 complete(peer, pr);
2547         ta--;
2548         return NULL;
2549 }
2550
2551 void * handle_clone(struct peer_req *pr)
2552 {
2553         int r;
2554         struct peerd *peer = pr->peer;
2555         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2556         if (!xclone) {
2557                 r = -1;
2558                 goto out;
2559         }
2560
2561         if (xclone->targetlen){
2562                 /* if snap was defined */
2563                 //support clone only from pithos
2564                 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
2565                                         MF_LOAD);
2566         } else {
2567                 /* else try to create a new volume */
2568                 XSEGLOG2(&lc, I, "Creating volume");
2569                 if (!xclone->size){
2570                         XSEGLOG2(&lc, E, "Cannot create volume. Size not specified");
2571                         r = -1;
2572                         goto out;
2573                 }
2574                 if (xclone->size > MAX_VOLUME_SIZE) {
2575                         XSEGLOG2(&lc, E, "Requested size %llu > max volume "
2576                                         "size %llu", xclone->size, MAX_VOLUME_SIZE);
2577                         r = -1;
2578                         goto out;
2579                 }
2580
2581                 struct map *map;
2582                 char *target = xseg_get_target(peer->xseg, pr->req);
2583
2584                 //create a new empty map of size
2585                 map = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2586                 if (!map){
2587                         r = -1;
2588                         goto out;
2589                 }
2590                 /* open map to get exclusive access to map */
2591                 r = open_map(pr, map, 0);
2592                 if (r < 0){
2593                         XSEGLOG2(&lc, E, "Cannot open map %s", map->volume);
2594                         XSEGLOG2(&lc, E, "Target volume %s exists", map->volume);
2595                         do_dropcache(pr, map);
2596                         r = -1;
2597                         goto out;
2598                 }
2599                 r = load_map(pr, map);
2600                 if (r >= 0) {
2601                         XSEGLOG2(&lc, E, "Map exists %s", map->volume);
2602                         do_close(pr, map);
2603                         r = -1;
2604                         goto out;
2605                 }
2606                 map->size = xclone->size;
2607                 //populate_map with zero objects;
2608                 uint64_t nr_objs = xclone->size / block_size;
2609                 if (xclone->size % block_size)
2610                         nr_objs++;
2611
2612                 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
2613                 if (!map_nodes){
2614                         do_close(pr, map);
2615                         r = -1;
2616                         goto out;
2617                 }
2618
2619                 uint64_t i;
2620                 for (i = 0; i < nr_objs; i++) {
2621                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2622                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2623                         map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2624                         map_nodes[i].flags = 0;
2625                         map_nodes[i].objectidx = i;
2626                         map_nodes[i].map = map;
2627                         map_nodes[i].ref = 1;
2628                         map_nodes[i].waiters = 0;
2629                         map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2630                         r = insert_object(map, &map_nodes[i]);
2631                         if (r < 0){
2632                                 do_close(pr, map);
2633                                 r = -1;
2634                                 goto out;
2635                         }
2636                 }
2637                 r = write_map(pr, map);
2638                 if (r < 0){
2639                         XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
2640                         do_close(pr, map);
2641                         goto out;
2642                 }
2643                 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
2644                 r = 0;
2645                 do_close(pr, map); //drop cache here for consistency
2646         }
2647 out:
2648         if (r < 0)
2649                 fail(peer, pr);
2650         else
2651                 complete(peer, pr);
2652         ta--;
2653         return NULL;
2654 }
2655
2656 void * handle_mapr(struct peer_req *pr)
2657 {
2658         struct peerd *peer = pr->peer;
2659         char *target = xseg_get_target(peer->xseg, pr->req);
2660         int r = map_action(do_mapr, pr, target, pr->req->targetlen,
2661                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2662         if (r < 0)
2663                 fail(peer, pr);
2664         else
2665                 complete(peer, pr);
2666         ta--;
2667         return NULL;
2668 }
2669
2670 void * handle_mapw(struct peer_req *pr)
2671 {
2672         struct peerd *peer = pr->peer;
2673         char *target = xseg_get_target(peer->xseg, pr->req);
2674         int r = map_action(do_mapw, pr, target, pr->req->targetlen,
2675                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2676         if (r < 0)
2677                 fail(peer, pr);
2678         else
2679                 complete(peer, pr);
2680         XSEGLOG2(&lc, D, "Ta: %d", ta);
2681         ta--;
2682         return NULL;
2683 }
2684
2685 void * handle_destroy(struct peer_req *pr)
2686 {
2687         struct peerd *peer = pr->peer;
2688         char *target = xseg_get_target(peer->xseg, pr->req);
2689         /* request EXCLUSIVE access, but do not force it.
2690          * check if succeeded on do_destroy
2691          */
2692         int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2693                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2694         if (r < 0)
2695                 fail(peer, pr);
2696         else
2697                 complete(peer, pr);
2698         ta--;
2699         return NULL;
2700 }
2701
2702 void * handle_open(struct peer_req *pr)
2703 {
2704         struct peerd *peer = pr->peer;
2705         char *target = xseg_get_target(peer->xseg, pr->req);
2706         //here we do not want to load
2707         int r = map_action(do_open, pr, target, pr->req->targetlen,
2708                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2709         if (r < 0)
2710                 fail(peer, pr);
2711         else
2712                 complete(peer, pr);
2713         ta--;
2714         return NULL;
2715 }
2716
2717 void * handle_close(struct peer_req *pr)
2718 {
2719         struct peerd *peer = pr->peer;
2720         char *target = xseg_get_target(peer->xseg, pr->req);
2721         //here we do not want to load
2722         int r = map_action(do_close, pr, target, pr->req->targetlen,
2723                                 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2724         if (r < 0)
2725                 fail(peer, pr);
2726         else
2727                 complete(peer, pr);
2728         ta--;
2729         return NULL;
2730 }
2731
2732 void * handle_snapshot(struct peer_req *pr)
2733 {
2734         struct peerd *peer = pr->peer;
2735         char *target = xseg_get_target(peer->xseg, pr->req);
2736         /* request EXCLUSIVE access, but do not force it.
2737          * check if succeeded on do_destroy
2738          */
2739         int r = map_action(do_snapshot, pr, target, pr->req->targetlen,
2740                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2741         if (r < 0)
2742                 fail(peer, pr);
2743         else
2744                 complete(peer, pr);
2745         ta--;
2746         return NULL;
2747 }
2748
2749 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
2750                         struct xseg_request *req)
2751 {
2752         //struct mapperd *mapper = __get_mapperd(peer);
2753         struct mapper_io *mio = __get_mapper_io(pr);
2754         void *(*action)(struct peer_req *) = NULL;
2755
2756         mio->state = ACCEPTED;
2757         mio->err = 0;
2758         mio->cb = NULL;
2759         switch (pr->req->op) {
2760                 /* primary xseg operations of mapper */
2761                 case X_CLONE: action = handle_clone; break;
2762                 case X_MAPR: action = handle_mapr; break;
2763                 case X_MAPW: action = handle_mapw; break;
2764                 case X_SNAPSHOT: action = handle_snapshot; break;
2765                 case X_INFO: action = handle_info; break;
2766                 case X_DELETE: action = handle_destroy; break;
2767                 case X_OPEN: action = handle_open; break;
2768                 case X_CLOSE: action = handle_close; break;
2769                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2770         }
2771         if (action){
2772                 ta++;
2773                 mio->active = 1;
2774                 st_thread_create(action, pr, 0, 0);
2775         }
2776         return 0;
2777
2778 }
2779
2780 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2781                 enum dispatch_reason reason)
2782 {
2783         struct mapperd *mapper = __get_mapperd(peer);
2784         (void) mapper;
2785         struct mapper_io *mio = __get_mapper_io(pr);
2786         (void) mio;
2787
2788
2789         if (reason == dispatch_accept)
2790                 dispatch_accepted(peer, pr, req);
2791         else {
2792                 if (mio->cb){
2793                         mio->cb(pr, req);
2794                 } else { 
2795                         signal_pr(pr);
2796                 }
2797         }
2798         return 0;
2799 }
2800
2801 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2802 {
2803         int i;
2804
2805         //FIXME error checks
2806         struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2807         peer->priv = mapperd;
2808         mapper = mapperd;
2809         mapper->hashmaps = xhash_new(3, STRING);
2810
2811         for (i = 0; i < peer->nr_ops; i++) {
2812                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2813                 mio->copyups_nodes = xhash_new(3, INTEGER);
2814                 mio->copyups = 0;
2815                 mio->err = 0;
2816                 mio->active = 0;
2817                 peer->peer_reqs[i].priv = mio;
2818         }
2819
2820         mapper->bportno = -1;
2821         mapper->mbportno = -1;
2822         BEGIN_READ_ARGS(argc, argv);
2823         READ_ARG_ULONG("-bp", mapper->bportno);
2824         READ_ARG_ULONG("-mbp", mapper->mbportno);
2825         END_READ_ARGS();
2826         if (mapper->bportno == -1){
2827                 XSEGLOG2(&lc, E, "Portno for blocker must be provided");
2828                 usage(argv[0]);
2829                 return -1;
2830         }
2831         if (mapper->mbportno == -1){
2832                 XSEGLOG2(&lc, E, "Portno for mblocker must be provided");
2833                 usage(argv[0]);
2834                 return -1;
2835         }
2836
2837         const struct sched_param param = { .sched_priority = 99 };
2838         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
2839         /* FIXME maybe place it in peer
2840          * should be done for each port (sportno to eportno)
2841          */
2842         xseg_set_max_requests(peer->xseg, peer->portno_start, 5000);
2843         xseg_set_freequeue_size(peer->xseg, peer->portno_start, 3000, 0);
2844
2845
2846 //      test_map(peer);
2847
2848         return 0;
2849 }
2850
2851 /* FIXME this should not be here */
2852 int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
2853 {
2854         struct xseg *xseg = peer->xseg;
2855         xport portno_start = peer->portno_start;
2856         xport portno_end = peer->portno_end;
2857         struct peer_req *pr;
2858         xport i;
2859         int  r, c = 0;
2860         struct xseg_request *received;
2861         xseg_prepare_wait(xseg, portno_start);
2862         while(1) {
2863                 XSEGLOG2(&lc, D, "Attempting to check for reply");
2864                 c = 1;
2865                 while (c){
2866                         c = 0;
2867                         for (i = portno_start; i <= portno_end; i++) {
2868                                 received = xseg_receive(xseg, i, 0);
2869                                 if (received) {
2870                                         c = 1;
2871                                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
2872                                         if (r < 0 || !pr || received != expected_req){
2873                                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
2874                                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
2875                                                 if (p == NoPort){
2876                                                         XSEGLOG2(&lc, W, "Could not respond stale request");
2877                                                         xseg_put_request(xseg, received, portno_start);
2878                                                         continue;
2879                                                 } else {
2880                                                         xseg_signal(xseg, p);
2881                                                 }
2882                                         } else {
2883                                                 xseg_cancel_wait(xseg, portno_start);
2884                                                 return 0;
2885                                         }
2886                                 }
2887                         }
2888                 }
2889                 xseg_wait_signal(xseg, 1000000UL);
2890         }
2891 }
2892
2893
2894 void custom_peer_finalize(struct peerd *peer)
2895 {
2896         struct mapperd *mapper = __get_mapperd(peer);
2897         struct peer_req *pr = alloc_peer_req(peer);
2898         if (!pr){
2899                 XSEGLOG2(&lc, E, "Cannot get peer request");
2900                 return;
2901         }
2902         struct map *map;
2903         struct xseg_request *req;
2904         xhash_iter_t it;
2905         xhashidx key, val;
2906         xhash_iter_init(mapper->hashmaps, &it);
2907         while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
2908                 map = (struct map *)val;
2909                 if (!(map->flags & MF_MAP_EXCLUSIVE))
2910                         continue;
2911                 req = __close_map(pr, map);
2912                 if (!req)
2913                         continue;
2914                 wait_reply(peer, req);
2915                 if (!(req->state & XS_SERVED))
2916                         XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
2917                 map->flags &= ~MF_MAP_CLOSING;
2918                 xseg_put_request(peer->xseg, req, pr->portno);
2919         }
2920         return;
2921
2922
2923 }
2924
2925 void print_obj(struct map_node *mn)
2926 {
2927         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2928                         (unsigned long long) mn->objectidx, mn->object, 
2929                         (unsigned int) mn->objectlen, 
2930                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2931 }
2932
2933 void print_map(struct map *m)
2934 {
2935         uint64_t nr_objs = m->size/block_size;
2936         if (m->size % block_size)
2937                 nr_objs++;
2938         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n", 
2939                         m->volume, m->volumelen, 
2940                         (unsigned long long) m->size, 
2941                         (unsigned long long) nr_objs,
2942                         m->version);
2943         uint64_t i;
2944         struct map_node *mn;
2945         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2946                 return;
2947         for (i = 0; i < nr_objs; i++) {
2948                 mn = find_object(m, i);
2949                 if (!mn){
2950                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
2951                         continue;
2952                 }
2953                 print_obj(mn);
2954         }
2955 }
2956
2957 /*
2958 void test_map(struct peerd *peer)
2959 {
2960         int i,j, ret;
2961         //struct sha256_ctx sha256ctx;
2962         unsigned char buf[SHA256_DIGEST_SIZE];
2963         char buf_new[XSEG_MAX_TARGETLEN + 20];
2964         struct map *m = malloc(sizeof(struct map));
2965         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2966         m->volume[XSEG_MAX_TARGETLEN] = 0;
2967         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2968         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2969         m->volumelen = XSEG_MAX_TARGETLEN;
2970         m->size = 100*block_size;
2971         m->objects = xhash_new(3, INTEGER);
2972         struct map_node *map_node = calloc(100, sizeof(struct map_node));
2973         for (i = 0; i < 100; i++) {
2974                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2975                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2976                 
2977                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2978                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2979                 }
2980                 map_node[i].objectidx = i;
2981                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2982                 map_node[i].flags = MF_OBJECT_EXIST;
2983                 ret = insert_object(m, &map_node[i]);
2984         }
2985
2986         char *data = malloc(block_size);
2987         mapheader_to_map(m, data);
2988         uint64_t pos = mapheader_size;
2989
2990         for (i = 0; i < 100; i++) {
2991                 map_node = find_object(m, i);
2992                 if (!map_node){
2993                         printf("no object node %d \n", i);
2994                         exit(1);
2995                 }
2996                 object_to_map(data+pos, map_node);
2997                 pos += objectsize_in_map;
2998         }
2999 //      print_map(m);
3000
3001         struct map *m2 = malloc(sizeof(struct map));
3002         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
3003         m->volume[XSEG_MAX_TARGETLEN] = 0;
3004         m->volumelen = XSEG_MAX_TARGETLEN;
3005
3006         m2->objects = xhash_new(3, INTEGER);
3007         ret = read_map(peer, m2, data);
3008 //      print_map(m2);
3009
3010         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
3011         ssize_t r, sum = 0;
3012         while (sum < block_size) {
3013                 r = write(fd, data + sum, block_size -sum);
3014                 if (r < 0){
3015                         perror("write");
3016                         printf("write error\n");
3017                         exit(1);
3018                 } 
3019                 sum += r;
3020         }
3021         close(fd);
3022         map_node = find_object(m, 0);
3023         free(map_node);
3024         free(m);
3025 }
3026 */