Merge branch 'hotfix-0.3.5'
[archipelago] / xseg / peers / user / mt-mapperd.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <stdio.h>
36 #include <unistd.h>
37 #include <sys/types.h>
38 #include <pthread.h>
39 #include <xseg/xseg.h>
40 #include <peer.h>
41 #include <time.h>
42 #include <xtypes/xlock.h>
43 #include <xtypes/xhash.h>
44 #include <xseg/protocol.h>
45 #include <sys/stat.h>
46 #include <fcntl.h>
47 #include <errno.h>
48 #include <sched.h>
49 #include <sys/syscall.h>
50 #include <openssl/sha.h>
51 #include <ctype.h>
52
53 /* general mapper flags */
54 #define MF_LOAD         (1 << 0)
55 #define MF_EXCLUSIVE    (1 << 1)
56 #define MF_FORCE        (1 << 2)
57 #define MF_ARCHIP       (1 << 3)
58
59 #ifndef SHA256_DIGEST_SIZE
60 #define SHA256_DIGEST_SIZE 32
61 #endif
62 /* hex representation of sha256 value takes up double the sha256 size */
63 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
64
65 #define block_size (1<<22) //FIXME this should be defined here?
66
67 /* transparency byte + max object len in disk */
68 #define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
69
70 /* Map header contains:
71  *      map version
72  *      volume size
73  */
74 #define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
75
76
77 #define MAPPER_PREFIX "archip_"
78 #define MAPPER_PREFIX_LEN 7
79
80 #define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
81 #define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
82
83 #if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
84 #error  "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
85 #endif
86
87 #define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
88
89 #if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
90 #error  "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
91 #endif
92
93 #define MAX_VOLUME_SIZE \
94 ((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
95
96
97 //char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
98
99 /* pithos considers this a block full of zeros, so should we.
100  * it is actually the sha256 hash of nothing.
101  */
102 char *zero_block="e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
103 #define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
104
105 /* dispatch_internal mapper states */
106 enum mapper_state {
107         ACCEPTED = 0,
108         WRITING = 1,
109         COPYING = 2,
110         DELETING = 3,
111         DROPPING_CACHE = 4
112 };
113
114 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
115
116
117 /* mapper object flags */
118 #define MF_OBJECT_EXIST         (1 << 0)
119 #define MF_OBJECT_COPYING       (1 << 1)
120 #define MF_OBJECT_WRITING       (1 << 2)
121 #define MF_OBJECT_DELETING      (1 << 3)
122 #define MF_OBJECT_DESTROYED     (1 << 5)
123 #define MF_OBJECT_SNAPSHOTTING  (1 << 6)
124
125 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
126                                 MF_OBJECT_DELETING|MF_OBJECT_SNAPSHOTTING)
127 struct map_node {
128         uint32_t flags;
129         uint32_t objectidx;
130         uint32_t objectlen;
131         char object[MAX_OBJECT_LEN + 1];        /* NULL terminated string */
132         struct map *map;
133         uint32_t ref;
134         uint32_t waiters;
135         st_cond_t cond;
136 };
137
138
139 #define wait_on_pr(__pr, __condition__)         \
140         do {                                    \
141                 ta--;                           \
142                 __get_mapper_io(pr)->active = 0;\
143                 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
144                 st_cond_wait(__pr->cond);       \
145         } while (__condition__)
146
147 #define wait_on_mapnode(__mn, __condition__)    \
148         do {                                    \
149                 ta--;                           \
150                 __mn->waiters++;                \
151                 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
152                         ta: %u",  __mn, __mn->object, __mn->waiters, ta);  \
153                 st_cond_wait(__mn->cond);       \
154         } while (__condition__)
155
156 #define wait_on_map(__map, __condition__)       \
157         do {                                    \
158                 ta--;                           \
159                 __map->waiters++;               \
160                 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
161                                    __map, __map->volume, __map->waiters, ta); \
162                 st_cond_wait(__map->cond);      \
163         } while (__condition__)
164
165 #define signal_pr(__pr)                         \
166         do {                                    \
167                 if (!__get_mapper_io(pr)->active){\
168                         ta++;                   \
169                         XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta);\
170                         __get_mapper_io(pr)->active = 1;\
171                         st_cond_signal(__pr->cond);     \
172                 }                               \
173         }while(0)
174
175 #define signal_map(__map)                       \
176         do {                                    \
177                 XSEGLOG2(&lc, D, "Checking map %lx %s. Waiters %u, ta: %u", \
178                                 __map, __map->volume, __map->waiters, ta);  \
179                 if (__map->waiters) {           \
180                         ta += __map->waiters;           \
181                         XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
182                         ta: %u",  __map, __map->volume, __map->waiters, ta); \
183                         __map->waiters = 0;     \
184                         st_cond_broadcast(__map->cond); \
185                 }                               \
186         }while(0)
187
188 #define signal_mapnode(__mn)                    \
189         do {                                    \
190                 if (__mn->waiters) {            \
191                         ta += __mn->waiters;    \
192                         XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
193                         %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
194                         __mn->waiters = 0;      \
195                         st_cond_broadcast(__mn->cond);  \
196                 }                               \
197         }while(0)
198
199
200 /* map flags */
201 #define MF_MAP_LOADING          (1 << 0)
202 #define MF_MAP_DESTROYED        (1 << 1)
203 #define MF_MAP_WRITING          (1 << 2)
204 #define MF_MAP_DELETING         (1 << 3)
205 #define MF_MAP_DROPPING_CACHE   (1 << 4)
206 #define MF_MAP_EXCLUSIVE        (1 << 5)
207 #define MF_MAP_OPENING          (1 << 6)
208 #define MF_MAP_CLOSING          (1 << 7)
209 #define MF_MAP_DELETED          (1 << 8)
210 #define MF_MAP_SNAPSHOTTING     (1 << 9)
211
212 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
213                                 MF_MAP_DROPPING_CACHE|MF_MAP_OPENING|          \
214                                 MF_MAP_SNAPSHOTTING)
215
216 struct map {
217         uint32_t version;
218         uint32_t flags;
219         uint64_t size;
220         uint32_t volumelen;
221         char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
222         xhash_t *objects;       /* obj_index --> map_node */
223         uint32_t ref;
224         uint32_t waiters;
225         st_cond_t cond;
226 };
227
228 struct mapperd {
229         xport bportno;          /* blocker that accesses data */
230         xport mbportno;         /* blocker that accesses maps */
231         xhash_t *hashmaps; // hash_function(target) --> struct map
232 };
233
234 struct mapper_io {
235         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
236         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
237         struct map_node *copyup_node;
238         volatile int err;                       /* error flag */
239         volatile uint64_t del_pending;
240         volatile uint64_t snap_pending;
241         uint64_t delobj;
242         uint64_t dcobj;
243         cb_t cb;
244         enum mapper_state state;
245         volatile int active;
246 };
247
248 /* global vars */
249 struct mapperd *mapper;
250
251 void print_map(struct map *m);
252
253
254 void custom_peer_usage()
255 {
256         fprintf(stderr, "Custom peer options: \n"
257                         "-bp  : port for block blocker(!)\n"
258                         "-mbp : port for map blocker\n"
259                         "\n");
260 }
261
262
263 /*
264  * Helper functions
265  */
266
267 static inline struct mapperd * __get_mapperd(struct peerd *peer)
268 {
269         return (struct mapperd *) peer->priv;
270 }
271
272 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
273 {
274         return (struct mapper_io *) pr->priv;
275 }
276
277 static inline uint64_t calc_map_obj(struct map *map)
278 {
279         if (map->size == -1)
280                 return 0;
281         uint64_t nr_objs = map->size / block_size;
282         if (map->size % block_size)
283                 nr_objs++;
284         return nr_objs;
285 }
286
287 static uint32_t calc_nr_obj(struct xseg_request *req)
288 {
289         unsigned int r = 1;
290         uint64_t rem_size = req->size;
291         uint64_t obj_offset = req->offset & (block_size -1); //modulo
292         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
293         rem_size -= obj_size;
294         while (rem_size > 0) {
295                 obj_size = (rem_size > block_size) ? block_size : rem_size;
296                 rem_size -= obj_size;
297                 r++;
298         }
299
300         return r;
301 }
302
303 /* hexlify function.
304  * Unsafe. Doesn't check if data length is odd!
305  */
306
307 static void hexlify(unsigned char *data, char *hex)
308 {
309         int i;
310         for (i=0; i<SHA256_DIGEST_LENGTH; i++)
311                 sprintf(hex+2*i, "%02x", data[i]);
312 }
313
314 static void unhexlify(char *hex, unsigned char *data)
315 {
316         int i;
317         char c;
318         for (i=0; i<SHA256_DIGEST_LENGTH; i++){
319                 data[i] = 0;
320                 c = hex[2*i];
321                 if (isxdigit(c)){
322                         if (isdigit(c)){
323                                 c-= '0';
324                         }
325                         else {
326                                 c = tolower(c);
327                                 c = c-'a' + 10;
328                         }
329                 }
330                 else {
331                         c = 0;
332                 }
333                 data[i] |= (c << 4) & 0xF0;
334                 c = hex[2*i+1];
335                 if (isxdigit(c)){
336                         if (isdigit(c)){
337                                 c-= '0';
338                         }
339                         else {
340                                 c = tolower(c);
341                                 c = c-'a' + 10;
342                         }
343                 }
344                 else {
345                         c = 0;
346                 }
347                 data[i] |= c & 0x0F;
348         }
349 }
350
351 void merkle_hash(unsigned char *hashes, unsigned long len,
352                 unsigned char hash[SHA256_DIGEST_SIZE])
353 {
354         uint32_t i, l, s = 2;
355         uint32_t nr = len/SHA256_DIGEST_SIZE;
356         unsigned char *buf;
357         unsigned char tmp_hash[SHA256_DIGEST_SIZE];
358
359         if (!nr){
360                 SHA256(hashes, 0, hash);
361                 return;
362         }
363         if (nr == 1){
364                 memcpy(hash, hashes, SHA256_DIGEST_SIZE);
365                 return;
366         }
367         while (s < nr)
368                 s = s << 1;
369         buf = malloc(sizeof(unsigned char)* SHA256_DIGEST_SIZE * s);
370         memcpy(buf, hashes, nr * SHA256_DIGEST_SIZE);
371         memset(buf + nr * SHA256_DIGEST_SIZE, 0, (s - nr) * SHA256_DIGEST_SIZE);
372         for (l = s; l > 1; l = l/2) {
373                 for (i = 0; i < l; i += 2) {
374                         SHA256(buf + (i * SHA256_DIGEST_SIZE),
375                                         2 * SHA256_DIGEST_SIZE, tmp_hash);
376                         memcpy(buf + (i/2 * SHA256_DIGEST_SIZE),
377                                         tmp_hash, SHA256_DIGEST_SIZE);
378                 }
379         }
380         memcpy(hash, buf, SHA256_DIGEST_SIZE);
381 }
382
383 /*
384  * Maps handling functions
385  */
386
387 static struct map * find_map(struct mapperd *mapper, char *volume)
388 {
389         struct map *m = NULL;
390         int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
391                                 (xhashidx *) &m);
392         if (r < 0)
393                 return NULL;
394         return m;
395 }
396
397 static struct map * find_map_len(struct mapperd *mapper, char *target,
398                                         uint32_t targetlen, uint32_t flags)
399 {
400         char buf[XSEG_MAX_TARGETLEN+1];
401         if (flags & MF_ARCHIP){
402                 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
403                 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
404                 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
405                 targetlen += MAPPER_PREFIX_LEN;
406         }
407         else {
408                 strncpy(buf, target, targetlen);
409                 buf[targetlen] = 0;
410         }
411
412         if (targetlen > MAX_VOLUME_LEN){
413                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
414                                         targetlen, MAX_VOLUME_LEN);
415                 return NULL;
416         }
417
418         XSEGLOG2(&lc, D, "looking up map %s, len %u",
419                         buf, targetlen);
420         return find_map(mapper, buf);
421 }
422
423
424 static int insert_map(struct mapperd *mapper, struct map *map)
425 {
426         int r = -1;
427
428         if (find_map(mapper, map->volume)){
429                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
430                 goto out;
431         }
432
433         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
434                         map->volume, strlen(map->volume), (unsigned long) map);
435         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
436         while (r == -XHASH_ERESIZE) {
437                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
438                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
439                 if (!new_hashmap){
440                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
441                                         (unsigned long long) shift);
442                         goto out;
443                 }
444                 mapper->hashmaps = new_hashmap;
445                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
446         }
447 out:
448         return r;
449 }
450
451 static int remove_map(struct mapperd *mapper, struct map *map)
452 {
453         int r = -1;
454
455         //assert no pending pr on map
456
457         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
458         while (r == -XHASH_ERESIZE) {
459                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
460                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
461                 if (!new_hashmap){
462                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
463                                         (unsigned long long) shift);
464                         goto out;
465                 }
466                 mapper->hashmaps = new_hashmap;
467                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
468         }
469 out:
470         return r;
471 }
472
473 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
474 {
475         int r;
476         xport p;
477         struct peerd *peer = pr->peer;
478         struct xseg_request *req;
479         struct mapperd *mapper = __get_mapperd(peer);
480         void *dummy;
481
482         XSEGLOG2(&lc, I, "Closing map %s", map->volume);
483
484         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
485         if (!req){
486                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
487                                 map->volume);
488                 goto out_err;
489         }
490
491         r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
492         if (r < 0){
493                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
494                                 map->volume);
495                 goto out_put;
496         }
497
498         char *reqtarget = xseg_get_target(peer->xseg, req);
499         if (!reqtarget)
500                 goto out_put;
501         strncpy(reqtarget, map->volume, req->targetlen);
502         req->op = X_RELEASE;
503         req->size = 0;
504         req->offset = 0;
505         r = xseg_set_req_data(peer->xseg, req, pr);
506         if (r < 0){
507                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
508                                 map->volume);
509                 goto out_put;
510         }
511         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
512         if (p == NoPort){
513                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
514                                 map->volume);
515                 goto out_unset;
516         }
517         r = xseg_signal(peer->xseg, p);
518         map->flags |= MF_MAP_CLOSING;
519
520         XSEGLOG2(&lc, I, "Map %s closing", map->volume);
521         return req;
522
523 out_unset:
524         xseg_get_req_data(peer->xseg, req, &dummy);
525 out_put:
526         xseg_put_request(peer->xseg, req, pr->portno);
527 out_err:
528         return NULL;
529 }
530
531 static int close_map(struct peer_req *pr, struct map *map)
532 {
533         int err;
534         struct xseg_request *req;
535         struct peerd *peer = pr->peer;
536
537         req = __close_map(pr, map);
538         if (!req)
539                 return -1;
540         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
541         map->flags &= ~MF_MAP_CLOSING;
542         err = req->state & XS_FAILED;
543         xseg_put_request(peer->xseg, req, pr->portno);
544         if (err)
545                 return -1;
546         return 0;
547 }
548
549 /*
550 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
551                                 char *target, uint32_t targetlen, struct map **m)
552 {
553         struct mapperd *mapper = __get_mapperd(peer);
554         int r;
555         *m = find_map(mapper, target, targetlen);
556         if (*m) {
557                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
558                 if ((*m)->flags & MF_MAP_NOT_READY) {
559                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
560                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
561                         return MF_PENDING;
562                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
563                 //      return -1;
564                 // 
565                 }else {
566                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
567                         return 0;
568                 }
569         }
570         r = open_map(peer, pr, target, targetlen, 0);
571         if (r < 0)
572                 return -1; //error
573         return MF_PENDING;      
574 }
575 */
576 /*
577  * Object handling functions
578  */
579
580 struct map_node *find_object(struct map *map, uint64_t obj_index)
581 {
582         struct map_node *mn;
583         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
584         if (r < 0)
585                 return NULL;
586         return mn;
587 }
588
589 static int insert_object(struct map *map, struct map_node *mn)
590 {
591         //FIXME no find object first
592         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
593         if (r == -XHASH_ERESIZE) {
594                 unsigned long shift = xhash_grow_size_shift(map->objects);
595                 map->objects = xhash_resize(map->objects, shift, NULL);
596                 if (!map->objects)
597                         return -1;
598                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
599         }
600         return r;
601 }
602
603
604 /*
605  * map read/write functions
606  *
607  * version 0 -> pithos map
608  * version 1 -> archipelago version 1
609  *
610  *
611  * functions
612  *      int read_object(struct map_node *mn, unsigned char *buf)
613  *      int prepare_write_object(struct peer_req *pr, struct map *map,
614  *                              struct map_node *mn, struct xseg_request *req)
615  *      int read_map(struct map *m, unsigned char * data)
616  *      int prepare_write_map(struct peer_req *pr, struct map *map,
617  *                                      struct xseg_request *req)
618  */
619
620 struct map_functions {
621         int (*read_object)(struct map_node *mn, unsigned char *buf);
622         int (*prepare_write_object)(struct peer_req *pr, struct map *map,
623                                 struct map_node *mn, struct xseg_request *req);
624         int (*read_map)(struct map *m, unsigned char * data);
625         int (*prepare_write_map)(struct peer_req *pr, struct map *map,
626                                         struct xseg_request *req);
627 };
628
629 /* version 0 functions */
630
631 /* no header */
632 #define v0_mapheader_size 0
633 /* just the unhexlified name */
634 #define v0_objectsize_in_map SHA256_DIGEST_SIZE
635
636 static inline int read_object_v0(struct map_node *mn, unsigned char *buf)
637 {
638         hexlify(buf, mn->object);
639         mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
640         mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
641         mn->flags = MF_OBJECT_EXIST;
642
643         return 0;
644 }
645
646 static void v0_object_to_map(struct map_node *mn, unsigned char *data)
647 {
648         unhexlify(mn->object, data);
649 }
650
651 static int prepare_write_object_v0(struct peer_req *pr, struct map *map,
652                         struct map_node *mn, struct xseg_request *req)
653 {
654         struct peerd *peer = pr->peer;
655         int r = xseg_prep_request(peer->xseg, req, map->volumelen, v0_objectsize_in_map);
656         if (r < 0){
657                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
658                                 "(Map: %s [%llu]",
659                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
660                 return -1;
661         }
662         char *target = xseg_get_target(peer->xseg, req);
663         strncpy(target, map->volume, req->targetlen);
664         req->size = req->datalen;
665         req->offset = v0_mapheader_size + mn->objectidx * v0_objectsize_in_map;
666
667         unsigned char *data = xseg_get_data(pr->peer->xseg, req);
668         v0_object_to_map(mn, data);
669         return -1;
670 }
671
672 static int read_map_v0(struct map *m, unsigned char * data)
673 {
674         int r;
675         struct map_node *map_node;
676         uint64_t i;
677         uint64_t pos = 0;
678         uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
679         XSEGLOG2(&lc, D, "Max nr_objs %llu", max_nr_objs);
680         char nulls[SHA256_DIGEST_SIZE];
681         memset(nulls, 0, SHA256_DIGEST_SIZE);
682         map_node = calloc(max_nr_objs, sizeof(struct map_node));
683         if (!map_node)
684                 return -1;
685         for (i = 0; i < max_nr_objs; i++) {
686                 if (!memcmp(data+pos, nulls, v0_objectsize_in_map))
687                         break;
688                 map_node[i].objectidx = i;
689                 map_node[i].map = m;
690                 map_node[i].waiters = 0;
691                 map_node[i].ref = 1;
692                 map_node[i].cond = st_cond_new(); //FIXME err check;
693                 read_object_v0(&map_node[i], data+pos);
694                 pos += v0_objectsize_in_map;
695                 r = insert_object(m, &map_node[i]); //FIXME error check
696         }
697         XSEGLOG2(&lc, D, "Found %llu objects", i);
698         m->size = i * block_size;
699         return 0;
700 }
701
702 static int prepare_write_map_v0(struct peer_req *pr, struct map *map,
703                                 struct xseg_request *req)
704 {
705         struct peerd *peer = pr->peer;
706         uint64_t i, pos = 0, max_objidx = calc_map_obj(map);
707         struct map_node *mn;
708         int r = xseg_prep_request(peer->xseg, req, map->volumelen,
709                         v0_mapheader_size + max_objidx * v0_objectsize_in_map);
710         if (r < 0){
711                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
712                 return -1;
713         }
714         char *target = xseg_get_target(peer->xseg, req);
715         strncpy(target, map->volume, req->targetlen);
716         char *data = xseg_get_data(peer->xseg, req);
717
718         req->op = X_WRITE;
719         req->size = req->datalen;
720         req->offset = 0;
721
722         for (i = 0; i < max_objidx; i++) {
723                 mn = find_object(map, i);
724                 if (!mn){
725                         XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
726                                         (unsigned long long) i, map->volume);
727                         return -1;
728                 }
729                 v0_object_to_map(mn, (unsigned char *)(data+pos));
730                 pos += v0_objectsize_in_map;
731         }
732         XSEGLOG2(&lc, D, "Prepared %llu objects", i);
733         return 0;
734 }
735
736 /* static struct map_functions map_functions_v0 =       { */
737 /*                      .read_object = read_object_v0, */
738 /*                      .read_map = read_map_v0, */
739 /*                      .prepare_write_object = prepare_write_object_v0, */
740 /*                      .prepare_write_map = prepare_write_map_v0 */
741 /* }; */
742 #define map_functions_v0 {                              \
743                         .read_object = read_object_v0,  \
744                         .read_map = read_map_v0,        \
745                         .prepare_write_object = prepare_write_object_v0,\
746                         .prepare_write_map = prepare_write_map_v0       \
747                         }
748 /* v1 functions */
749
750 /* transparency byte + max object len in disk */
751 #define v1_objectsize_in_map (1 + SHA256_DIGEST_SIZE)
752
753 /* Map header contains:
754  *      map version
755  *      volume size
756  */
757 #define v1_mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
758
759 static inline int read_object_v1(struct map_node *mn, unsigned char *buf)
760 {
761         char c = buf[0];
762         mn->flags = 0;
763         if (c){
764                 mn->flags |= MF_OBJECT_EXIST;
765                 strcpy(mn->object, MAPPER_PREFIX);
766                 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
767                 mn->object[MAX_OBJECT_LEN] = 0;
768                 mn->objectlen = strlen(mn->object);
769         }
770         else {
771                 mn->flags &= ~MF_OBJECT_EXIST;
772                 hexlify(buf+1, mn->object);
773                 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
774                 mn->objectlen = strlen(mn->object);
775         }
776         return 0;
777 }
778
779 static inline void v1_object_to_map(char* buf, struct map_node *mn)
780 {
781         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
782         if (buf[0]){
783                 /* strip common prefix */
784                 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
785         }
786         else {
787                 unhexlify(mn->object, (unsigned char *)(buf+1));
788         }
789 }
790
791 static int prepare_write_object_v1(struct peer_req *pr, struct map *map,
792                                 struct map_node *mn, struct xseg_request *req)
793 {
794         struct peerd *peer = pr->peer;
795         int r = xseg_prep_request(peer->xseg, req, map->volumelen, v1_objectsize_in_map);
796         if (r < 0){
797                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
798                                 "(Map: %s [%llu]",
799                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
800                 return -1;
801         }
802         char *target = xseg_get_target(peer->xseg, req);
803         strncpy(target, map->volume, req->targetlen);
804         req->size = req->datalen;
805         req->offset = v1_mapheader_size + mn->objectidx * v1_objectsize_in_map;
806
807         char *data = xseg_get_data(pr->peer->xseg, req);
808         v1_object_to_map(data, mn);
809         return 0;
810 }
811
812 static int read_map_v1(struct map *m, unsigned char * data)
813 {
814         int r;
815         struct map_node *map_node;
816         uint64_t i;
817         uint64_t pos = 0;
818         uint64_t nr_objs;
819
820         /* read header */
821         m->version = *(uint32_t *) (data + pos);
822         pos += sizeof(uint32_t);
823         m->size = *(uint64_t *) (data + pos);
824         pos += sizeof(uint64_t);
825
826         /* read objects */
827         nr_objs = m->size / block_size;
828         if (m->size % block_size)
829                 nr_objs++;
830         map_node = calloc(nr_objs, sizeof(struct map_node));
831         if (!map_node)
832                 return -1;
833
834         for (i = 0; i < nr_objs; i++) {
835                 map_node[i].map = m;
836                 map_node[i].objectidx = i;
837                 map_node[i].waiters = 0;
838                 map_node[i].ref = 1;
839                 map_node[i].cond = st_cond_new(); //FIXME err check;
840                 read_object_v1(&map_node[i], data+pos);
841                 pos += objectsize_in_map;
842                 r = insert_object(m, &map_node[i]); //FIXME error check
843         }
844         return 0;
845 }
846
847 static int prepare_write_map_v1(struct peer_req *pr, struct map *m,
848                                 struct xseg_request *req)
849 {
850         struct peerd *peer = pr->peer;
851         uint64_t i, pos = 0, max_objidx = calc_map_obj(m);
852         struct map_node *mn;
853
854         int r = xseg_prep_request(peer->xseg, req, m->volumelen,
855                         v1_mapheader_size + max_objidx * v1_objectsize_in_map);
856         if (r < 0){
857                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", m->volume);
858                 return -1;
859         }
860         char *target = xseg_get_target(peer->xseg, req);
861         strncpy(target, m->volume, req->targetlen);
862         char *data = xseg_get_data(peer->xseg, req);
863
864         memcpy(data + pos, &m->version, sizeof(m->version));
865         pos += sizeof(m->version);
866         memcpy(data + pos, &m->size, sizeof(m->size));
867         pos += sizeof(m->size);
868
869         req->op = X_WRITE;
870         req->size = req->datalen;
871         req->offset = 0;
872
873         for (i = 0; i < max_objidx; i++) {
874                 mn = find_object(m, i);
875                 if (!mn){
876                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
877                                         (unsigned long long) i, m->volume);
878                         return -1;
879                 }
880                 v1_object_to_map(data+pos, mn);
881                 pos += v1_objectsize_in_map;
882         }
883         return 0;
884 }
885
886 /* static struct map_functions map_functions_v1 =       { */
887 /*                      .read_object = read_object_v1, */
888 /*                      .read_map = read_map_v1, */
889 /*                      .prepare_write_object = prepare_write_object_v1, */
890 /*                      .prepare_write_map = prepare_write_map_v1 */
891 /* }; */
892 #define map_functions_v1 {                              \
893                         .read_object = read_object_v1,  \
894                         .read_map = read_map_v1,        \
895                         .prepare_write_object = prepare_write_object_v1,\
896                         .prepare_write_map = prepare_write_map_v1       \
897                         }
898
899 static struct map_functions map_functions[] = { map_functions_v0,
900                                                 map_functions_v1 };
901 #define MAP_LATEST_VERSION 1
902 /* end of functions */
903
904
905
906
907
908 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
909 {
910         hexlify(buf, mn->object);
911         mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
912         mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
913         mn->flags = MF_OBJECT_EXIST;
914 }
915
916 static inline void map_to_object(struct map_node *mn, unsigned char *buf)
917 {
918         char c = buf[0];
919         mn->flags = 0;
920         if (c){
921                 mn->flags |= MF_OBJECT_EXIST;
922                 strcpy(mn->object, MAPPER_PREFIX);
923                 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
924                 mn->object[MAX_OBJECT_LEN] = 0;
925                 mn->objectlen = strlen(mn->object);
926         }
927         else {
928                 hexlify(buf+1, mn->object);
929                 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
930                 mn->objectlen = strlen(mn->object);
931         }
932
933 }
934
935 static inline void object_to_map(char* buf, struct map_node *mn)
936 {
937         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
938         if (buf[0]){
939                 /* strip common prefix */
940                 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
941         }
942         else {
943                 unhexlify(mn->object, (unsigned char *)(buf+1));
944         }
945 }
946
947 static inline void mapheader_to_map(struct map *m, char *buf)
948 {
949         uint64_t pos = 0;
950         memcpy(buf + pos, &m->version, sizeof(m->version));
951         pos += sizeof(m->version);
952         memcpy(buf + pos, &m->size, sizeof(m->size));
953         pos += sizeof(m->size);
954 }
955
956
957 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
958                                 struct map *map, struct map_node *mn)
959 {
960         int r;
961         void *dummy;
962         struct mapperd *mapper = __get_mapperd(peer);
963         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
964                                                         mapper->mbportno, X_ALLOC);
965         if (!req){
966                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
967                                 "(Map: %s [%llu]",
968                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
969                 goto out_err;
970         }
971
972         r = map_functions[map->version].prepare_write_object(pr, map, mn, req);
973         if (r < 0){
974                 XSEGLOG2(&lc, E, "Cannot prepare write object");
975                 goto out_put;
976         }
977         req->op = X_WRITE;
978
979         r = xseg_set_req_data(peer->xseg, req, pr);
980         if (r < 0){
981                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
982                                 "(Map: %s [%llu]",
983                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
984                 goto out_put;
985         }
986         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
987         if (p == NoPort){
988                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
989                                 "(Map: %s [%llu]",
990                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
991                 goto out_unset;
992         }
993         r = xseg_signal(peer->xseg, p);
994         if (r < 0)
995                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
996
997         XSEGLOG2(&lc, I, "Writing object %s \n\t"
998                         "Map: %s [%llu]",
999                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1000
1001         return req;
1002
1003 out_unset:
1004         xseg_get_req_data(peer->xseg, req, &dummy);
1005 out_put:
1006         xseg_put_request(peer->xseg, req, pr->portno);
1007 out_err:
1008         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
1009                         "(Map: %s [%llu]",
1010                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1011         return NULL;
1012 }
1013
1014 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
1015 {
1016         int r;
1017         void *dummy;
1018         struct peerd *peer = pr->peer;
1019         struct mapperd *mapper = __get_mapperd(peer);
1020         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1021                                                         mapper->mbportno, X_ALLOC);
1022         if (!req){
1023                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
1024                 goto out_err;
1025         }
1026
1027         r = map_functions[map->version].prepare_write_map(pr, map, req);
1028         if (r < 0){
1029                 XSEGLOG2(&lc, E, "Cannot prepare write map");
1030                 goto out_put;
1031         }
1032
1033         req->op = X_WRITE;
1034
1035         r = xseg_set_req_data(peer->xseg, req, pr);
1036         if (r < 0){
1037                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1038                                 map->volume);
1039                 goto out_put;
1040         }
1041         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1042         if (p == NoPort){
1043                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1044                                 map->volume);
1045                 goto out_unset;
1046         }
1047         r = xseg_signal(peer->xseg, p);
1048         if (r < 0)
1049                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
1050
1051         map->flags |= MF_MAP_WRITING;
1052         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
1053         return req;
1054
1055 out_unset:
1056         xseg_get_req_data(peer->xseg, req, &dummy);
1057 out_put:
1058         xseg_put_request(peer->xseg, req, pr->portno);
1059 out_err:
1060         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
1061         return NULL;
1062 }
1063
1064 static int write_map(struct peer_req* pr, struct map *map)
1065 {
1066         int r = 0;
1067         struct peerd *peer = pr->peer;
1068         struct xseg_request *req = __write_map(pr, map);
1069         if (!req)
1070                 return -1;
1071         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1072         if (req->state & XS_FAILED)
1073                 r = -1;
1074         xseg_put_request(peer->xseg, req, pr->portno);
1075         map->flags &= ~MF_MAP_WRITING;
1076         return r;
1077 }
1078
1079 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
1080 {
1081         int r;
1082         xport p;
1083         struct xseg_request *req;
1084         struct peerd *peer = pr->peer;
1085         struct mapperd *mapper = __get_mapperd(peer);
1086         void *dummy;
1087
1088         XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
1089
1090         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1091         if (!req){
1092                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1093                                 m->volume);
1094                 goto out_fail;
1095         }
1096
1097         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1098         if (r < 0){
1099                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1100                                 m->volume);
1101                 goto out_put;
1102         }
1103
1104         char *reqtarget = xseg_get_target(peer->xseg, req);
1105         if (!reqtarget)
1106                 goto out_put;
1107         strncpy(reqtarget, m->volume, req->targetlen);
1108         req->op = X_READ;
1109         req->size = block_size;
1110         req->offset = 0;
1111         r = xseg_set_req_data(peer->xseg, req, pr);
1112         if (r < 0){
1113                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1114                                 m->volume);
1115                 goto out_put;
1116         }
1117         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1118         if (p == NoPort){
1119                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1120                                 m->volume);
1121                 goto out_unset;
1122         }
1123         r = xseg_signal(peer->xseg, p);
1124
1125         m->flags |= MF_MAP_LOADING;
1126         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
1127         return req;
1128
1129 out_unset:
1130         xseg_get_req_data(peer->xseg, req, &dummy);
1131 out_put:
1132         xseg_put_request(peer->xseg, req, pr->portno);
1133 out_fail:
1134         return NULL;
1135 }
1136
1137 static int read_map (struct map *map, unsigned char *buf)
1138 {
1139         char nulls[SHA256_DIGEST_SIZE];
1140         memset(nulls, 0, SHA256_DIGEST_SIZE);
1141
1142         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
1143         if (r) {
1144                 XSEGLOG2(&lc, E, "Read zeros");
1145                 return -1;
1146         }
1147         //type 1, archip type, type 0 pithos map
1148         int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1149         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
1150         uint32_t version;
1151         if (type)
1152                 version = *(uint32_t *) (buf); //version should always be the first uint32_t
1153         else
1154                 version = 0;
1155         if (version > MAP_LATEST_VERSION){
1156                 XSEGLOG2(&lc, E, "Map read for map %s failed. Invalid version %u",
1157                                 map->volume, version);
1158                 return -1;
1159         }
1160
1161         r = map_functions[version].read_map(map, buf);
1162         if (r < 0){
1163                 XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
1164                 return -1;
1165         }
1166
1167         print_map(map);
1168         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
1169         return 0;
1170
1171 }
1172
1173 static int load_map(struct peer_req *pr, struct map *map)
1174 {
1175         int r = 0;
1176         struct xseg_request *req;
1177         struct peerd *peer = pr->peer;
1178         req = __load_map(pr, map);
1179         if (!req)
1180                 return -1;
1181         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1182         map->flags &= ~MF_MAP_LOADING;
1183         if (req->state & XS_FAILED){
1184                 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
1185                 xseg_put_request(peer->xseg, req, pr->portno);
1186                 return -1;
1187         }
1188         r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
1189         xseg_put_request(peer->xseg, req, pr->portno);
1190         return r;
1191 }
1192
1193 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
1194                                                 uint32_t flags)
1195 {
1196         int r;
1197         xport p;
1198         struct xseg_request *req;
1199         struct peerd *peer = pr->peer;
1200         struct mapperd *mapper = __get_mapperd(peer);
1201         void *dummy;
1202
1203         XSEGLOG2(&lc, I, "Opening map %s", m->volume);
1204
1205         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1206         if (!req){
1207                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1208                                 m->volume);
1209                 goto out_fail;
1210         }
1211
1212         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1213         if (r < 0){
1214                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1215                                 m->volume);
1216                 goto out_put;
1217         }
1218
1219         char *reqtarget = xseg_get_target(peer->xseg, req);
1220         if (!reqtarget)
1221                 goto out_put;
1222         strncpy(reqtarget, m->volume, req->targetlen);
1223         req->op = X_ACQUIRE;
1224         req->size = block_size;
1225         req->offset = 0;
1226         if (!(flags & MF_FORCE))
1227                 req->flags = XF_NOSYNC;
1228         r = xseg_set_req_data(peer->xseg, req, pr);
1229         if (r < 0){
1230                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1231                                 m->volume);
1232                 goto out_put;
1233         }
1234         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1235         if (p == NoPort){ 
1236                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1237                                 m->volume);
1238                 goto out_unset;
1239         }
1240         r = xseg_signal(peer->xseg, p);
1241
1242         m->flags |= MF_MAP_OPENING;
1243         XSEGLOG2(&lc, I, "Map %s opening", m->volume);
1244         return req;
1245
1246 out_unset:
1247         xseg_get_req_data(peer->xseg, req, &dummy);
1248 out_put:
1249         xseg_put_request(peer->xseg, req, pr->portno);
1250 out_fail:
1251         return NULL;
1252 }
1253
1254 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
1255 {
1256         int err;
1257         struct xseg_request *req;
1258         struct peerd *peer = pr->peer;
1259
1260         req = __open_map(pr, map, flags);
1261         if (!req){
1262                 return -1;
1263         }
1264         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1265         map->flags &= ~MF_MAP_OPENING;
1266         err = req->state & XS_FAILED;
1267         xseg_put_request(peer->xseg, req, pr->portno);
1268         if (err)
1269                 return -1;
1270         else
1271                 map->flags |= MF_MAP_EXCLUSIVE;
1272         return 0;
1273 }
1274
1275 /*
1276  * copy up functions
1277  */
1278
1279 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
1280 {
1281         int r = 0;
1282         if (mn){
1283                 XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
1284                                 req, mn, mio);
1285                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1286                 if (r == -XHASH_ERESIZE) {
1287                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
1288                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1289                         if (!new_hashmap)
1290                                 goto out;
1291                         mio->copyups_nodes = new_hashmap;
1292                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1293                 }
1294                 if (r < 0)
1295                         XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
1296                                         req, mn, mio);
1297         }
1298         else {
1299                 XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
1300                                 req, mio);
1301                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1302                 if (r == -XHASH_ERESIZE) {
1303                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
1304                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1305                         if (!new_hashmap)
1306                                 goto out;
1307                         mio->copyups_nodes = new_hashmap;
1308                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1309                 }
1310                 if (r < 0)
1311                         XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
1312                                         req, mio);
1313         }
1314 out:
1315         return r;
1316 }
1317
1318 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
1319 {
1320         struct map_node *mn;
1321         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
1322         if (r < 0){
1323                 XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
1324                 return NULL;
1325         }
1326         XSEGLOG2(&lc, D, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
1327         return mn;
1328 }
1329
1330 static struct xseg_request * __snapshot_object(struct peer_req *pr,
1331                                                 struct map_node *mn)
1332 {
1333         struct peerd *peer = pr->peer;
1334         struct mapperd *mapper = __get_mapperd(peer);
1335         struct mapper_io *mio = __get_mapper_io(pr);
1336         //struct map *map = mn->map;
1337         void *dummy;
1338         int r = -1;
1339         xport p;
1340
1341         //assert mn->volume != zero_block
1342         //assert mn->flags & MF_OBJECT_EXIST
1343         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1344                                                 mapper->bportno, X_ALLOC);
1345         if (!req){
1346                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1347                 goto out_err;
1348         }
1349         r = xseg_prep_request(peer->xseg, req, mn->objectlen,
1350                                 sizeof(struct xseg_request_snapshot));
1351         if (r < 0){
1352                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1353                 goto out_put;
1354         }
1355
1356         char *target = xseg_get_target(peer->xseg, req);
1357         strncpy(target, mn->object, req->targetlen);
1358
1359         struct xseg_request_snapshot *xsnapshot = (struct xseg_request_snapshot *) xseg_get_data(peer->xseg, req);
1360         xsnapshot->target[0] = 0;
1361         xsnapshot->targetlen = 0;
1362
1363         req->offset = 0;
1364         req->size = block_size;
1365         req->op = X_SNAPSHOT;
1366         r = xseg_set_req_data(peer->xseg, req, pr);
1367         if (r<0){
1368                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1369                 goto out_put;
1370         }
1371         r = __set_copyup_node(mio, req, mn);
1372         if (r < 0)
1373                 goto out_unset;
1374         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1375         if (p == NoPort) {
1376                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1377                 goto out_mapper_unset;
1378         }
1379         xseg_signal(peer->xseg, p);
1380
1381         mn->flags |= MF_OBJECT_SNAPSHOTTING;
1382         XSEGLOG2(&lc, I, "Snapshotting up object %s", mn->object);
1383         return req;
1384
1385 out_mapper_unset:
1386         __set_copyup_node(mio, req, NULL);
1387 out_unset:
1388         xseg_get_req_data(peer->xseg, req, &dummy);
1389 out_put:
1390         xseg_put_request(peer->xseg, req, pr->portno);
1391 out_err:
1392         XSEGLOG2(&lc, E, "Snapshotting object %s failed", mn->object);
1393         return NULL;
1394 }
1395
1396 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
1397 {
1398         struct mapperd *mapper = __get_mapperd(peer);
1399         struct mapper_io *mio = __get_mapper_io(pr);
1400         struct map *map = mn->map;
1401         void *dummy;
1402         int r = -1;
1403         xport p;
1404
1405         uint32_t newtargetlen;
1406         char new_target[MAX_OBJECT_LEN + 1];
1407         unsigned char sha[SHA256_DIGEST_SIZE];
1408
1409         strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1410
1411         char tmp[XSEG_MAX_TARGETLEN + 1];
1412         uint32_t tmplen;
1413         strncpy(tmp, map->volume, map->volumelen);
1414         sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1415         tmp[XSEG_MAX_TARGETLEN] = 0;
1416         tmplen = strlen(tmp);
1417         XSEGLOG2(&lc, D, "Base for new target: %s (len: %d)", tmp, tmplen);
1418         SHA256((unsigned char *)tmp, tmplen, sha);
1419         hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1420         newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1421         #define fooooo 71
1422         XSEGLOG2(&lc, D, "New target: %.71s (len: %d)", new_target, newtargetlen);
1423         #undef fooooo
1424
1425         if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1426                 goto copyup_zeroblock;
1427
1428         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1429                                                 mapper->bportno, X_ALLOC);
1430         if (!req){
1431                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1432                 goto out_err;
1433         }
1434         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
1435                                 sizeof(struct xseg_request_copy));
1436         if (r < 0){
1437                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1438                 goto out_put;
1439         }
1440
1441         char *target = xseg_get_target(peer->xseg, req);
1442         strncpy(target, new_target, req->targetlen);
1443
1444         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1445         strncpy(xcopy->target, mn->object, mn->objectlen);
1446         xcopy->targetlen = mn->objectlen;
1447
1448         req->offset = 0;
1449         req->size = block_size;
1450         req->op = X_COPY;
1451         r = xseg_set_req_data(peer->xseg, req, pr);
1452         if (r<0){
1453                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1454                 goto out_put;
1455         }
1456         r = __set_copyup_node(mio, req, mn);
1457         if (r < 0)
1458                 goto out_unset;
1459         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1460         if (p == NoPort) {
1461                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1462                 goto out_mapper_unset;
1463         }
1464         xseg_signal(peer->xseg, p);
1465 //      mio->copyups++;
1466
1467         mn->flags |= MF_OBJECT_COPYING;
1468         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1469         return req;
1470
1471 out_mapper_unset:
1472         __set_copyup_node(mio, req, NULL);
1473 out_unset:
1474         xseg_get_req_data(peer->xseg, req, &dummy);
1475 out_put:
1476         xseg_put_request(peer->xseg, req, pr->portno);
1477 out_err:
1478         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1479         return NULL;
1480
1481 copyup_zeroblock:
1482         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1483                         "Proceeding in writing the new object in map");
1484         /* construct a tmp map_node for writing purposes */
1485         struct map_node newmn = *mn;
1486         newmn.flags = MF_OBJECT_EXIST;
1487         strncpy(newmn.object, new_target, newtargetlen);
1488         newmn.object[newtargetlen] = 0;
1489         newmn.objectlen = newtargetlen;
1490         newmn.objectidx = mn->objectidx; 
1491         req = object_write(peer, pr, map, &newmn);
1492         r = __set_copyup_node(mio, req, mn);
1493         if (r < 0)
1494                 return NULL;
1495         if (!req){
1496                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1497                                 "\n\t of map %s [%llu]",
1498                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
1499                 return NULL;
1500         }
1501         mn->flags |= MF_OBJECT_WRITING;
1502         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1503         return req;
1504 }
1505
1506 static struct xseg_request * __delete_object(struct peer_req *pr, struct map_node *mn)
1507 {
1508         void *dummy;
1509         struct peerd *peer = pr->peer;
1510         struct mapperd *mapper = __get_mapperd(peer);
1511         struct mapper_io *mio = __get_mapper_io(pr);
1512         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1513                                                         mapper->bportno, X_ALLOC);
1514         XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1515         if (!req){
1516                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1517                 goto out_err;
1518         }
1519         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1520         if (r < 0){
1521                 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1522                 goto out_put;
1523         }
1524         char *target = xseg_get_target(peer->xseg, req);
1525         strncpy(target, mn->object, req->targetlen);
1526         req->op = X_DELETE;
1527         req->size = req->datalen;
1528         req->offset = 0;
1529         r = xseg_set_req_data(peer->xseg, req, pr);
1530         if (r < 0){
1531                 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1532                 goto out_put;
1533         }
1534         r = __set_copyup_node(mio, req, mn);
1535         if (r < 0)
1536                 goto out_unset;
1537         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1538         if (p == NoPort){
1539                 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1540                 goto out_mapper_unset;
1541         }
1542         r = xseg_signal(peer->xseg, p);
1543         mn->flags |= MF_OBJECT_DELETING;
1544         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1545         return req;
1546
1547 out_mapper_unset:
1548         __set_copyup_node(mio, req, NULL);
1549 out_unset:
1550         xseg_get_req_data(peer->xseg, req, &dummy);
1551 out_put:
1552         xseg_put_request(peer->xseg, req, pr->portno);
1553 out_err:
1554         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1555         return NULL;
1556 }
1557
1558 static struct xseg_request * __delete_map(struct peer_req *pr, struct map *map)
1559 {
1560         void *dummy;
1561         struct peerd *peer = pr->peer;
1562         struct mapperd *mapper = __get_mapperd(peer);
1563         struct mapper_io *mio = __get_mapper_io(pr);
1564         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1565                                                         mapper->mbportno, X_ALLOC);
1566         XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1567         if (!req){
1568                 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1569                 goto out_err;
1570         }
1571         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1572         if (r < 0){
1573                 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1574                 goto out_put;
1575         }
1576         char *target = xseg_get_target(peer->xseg, req);
1577         strncpy(target, map->volume, req->targetlen);
1578         req->op = X_DELETE;
1579         req->size = req->datalen;
1580         req->offset = 0;
1581         r = xseg_set_req_data(peer->xseg, req, pr);
1582         if (r < 0){
1583                 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1584                 goto out_put;
1585         }
1586         /* do not check return value. just make sure there is no node set */
1587         __set_copyup_node(mio, req, NULL);
1588         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1589         if (p == NoPort){
1590                 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1591                 goto out_unset;
1592         }
1593         r = xseg_signal(peer->xseg, p);
1594         map->flags |= MF_MAP_DELETING;
1595         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1596         return req;
1597
1598 out_unset:
1599         xseg_get_req_data(peer->xseg, req, &dummy);
1600 out_put:
1601         xseg_put_request(peer->xseg, req, pr->portno);
1602 out_err:
1603         XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1604         return  NULL;
1605 }
1606
1607
1608 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1609 {
1610         struct map_node *mn = find_object(map, index);
1611         if (mn)
1612                 mn->ref++;
1613         return mn;
1614 }
1615
1616 static inline void put_mapnode(struct map_node *mn)
1617 {
1618         mn->ref--;
1619         if (!mn->ref){
1620                 //clean up mn
1621                 st_cond_destroy(mn->cond);
1622         }
1623 }
1624
1625 static inline void __get_map(struct map *map)
1626 {
1627         map->ref++;
1628 }
1629
1630 static inline void put_map(struct map *map)
1631 {
1632         struct map_node *mn;
1633         XSEGLOG2(&lc, D, "Putting map %lx %s. ref %u", map, map->volume, map->ref);
1634         map->ref--;
1635         if (!map->ref){
1636                 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1637                 //clean up map
1638                 uint64_t i;
1639                 for (i = 0; i < calc_map_obj(map); i++) {
1640                         mn = get_mapnode(map, i);
1641                         if (mn) {
1642                                 //make sure all pending operations on all objects are completed
1643                                 //this should never happen...
1644                                 if (mn->flags & MF_OBJECT_NOT_READY)
1645                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1646                                 mn->flags |= MF_OBJECT_DESTROYED;
1647                                 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1648                                 put_mapnode(mn); //matcing get_mapnode;
1649                                 //assert mn->ref == 0;
1650                         }
1651                 }
1652                 mn = find_object(map, 0);
1653                 if (mn)
1654                         free(mn);
1655                 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1656                 free(map);
1657         }
1658 }
1659
1660 static struct map * create_map(struct mapperd *mapper, char *name,
1661                                 uint32_t namelen, uint32_t flags)
1662 {
1663         int r;
1664         if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1665                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1666                                         namelen, MAX_VOLUME_LEN);
1667                 return NULL;
1668         }
1669         struct map *m = malloc(sizeof(struct map));
1670         if (!m){
1671                 XSEGLOG2(&lc, E, "Cannot allocate map ");
1672                 goto out_err;
1673         }
1674         m->size = -1;
1675         if (flags & MF_ARCHIP){
1676                 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1677                 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1678                 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1679                 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1680                 m->version = 1; /* keep this hardcoded for now */
1681         }
1682         else {
1683                 strncpy(m->volume, name, namelen);
1684                 m->volume[namelen] = 0;
1685                 m->volumelen = namelen;
1686                 m->version = 0; /* version 0 should be pithos maps */
1687         }
1688         m->flags = 0;
1689         m->objects = xhash_new(3, INTEGER); 
1690         if (!m->objects){
1691                 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1692                                 m->volume);
1693                 goto out_map;
1694         }
1695         m->ref = 1;
1696         m->waiters = 0;
1697         m->cond = st_cond_new(); //FIXME err check;
1698         r = insert_map(mapper, m);
1699         if (r < 0){
1700                 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1701                 goto out_hash;
1702         }
1703
1704         return m;
1705
1706 out_hash:
1707         xhash_free(m->objects);
1708 out_map:
1709         XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1710         free(m);
1711 out_err:
1712         return NULL;
1713 }
1714
1715
1716
1717 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1718 {
1719         struct peerd *peer = pr->peer;
1720         struct mapperd *mapper = __get_mapperd(peer);
1721         (void)mapper;
1722         struct mapper_io *mio = __get_mapper_io(pr);
1723         struct map_node *mn = __get_copyup_node(mio, req);
1724
1725         __set_copyup_node(mio, req, NULL);
1726
1727         //assert req->op = X_DELETE;
1728         //assert pr->req->op = X_DELETE only map deletions make delete requests
1729         //assert mio->del_pending > 0
1730         XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
1731         mio->del_pending--;
1732
1733         if (req->state & XS_FAILED){
1734                 mio->err = 1;
1735         }
1736         if (mn){
1737                 XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
1738                                 mn, mn->object, mio, req);
1739                 // assert mn->flags & MF_OBJECT_DELETING
1740                 mn->flags &= ~MF_OBJECT_DELETING;
1741                 mn->flags |= MF_OBJECT_DESTROYED;
1742                 signal_mapnode(mn);
1743                 /* put mapnode here, matches get_mapnode on do_destroy */
1744                 put_mapnode(mn);
1745         } else {
1746                 XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
1747                                 mio, req);
1748         }
1749         xseg_put_request(peer->xseg, req, pr->portno);
1750         signal_pr(pr);
1751 }
1752
1753 void snapshot_cb(struct peer_req *pr, struct xseg_request *req)
1754 {
1755         struct peerd *peer = pr->peer;
1756         struct mapperd *mapper = __get_mapperd(peer);
1757         (void)mapper;
1758         struct mapper_io *mio = __get_mapper_io(pr);
1759         struct map_node *mn = __get_copyup_node(mio, req);
1760         if (!mn){
1761                 XSEGLOG2(&lc, E, "Cannot get map node");
1762                 goto out_err;
1763         }
1764         __set_copyup_node(mio, req, NULL);
1765
1766         if (req->state & XS_FAILED){
1767                 if (req->op == X_DELETE){
1768                         XSEGLOG2(&lc, E, "Delete req failed");
1769                         goto out_ok;
1770                 }
1771                 XSEGLOG2(&lc, E, "Req failed");
1772                 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1773                 mn->flags &= ~MF_OBJECT_WRITING;
1774                 goto out_err;
1775         }
1776
1777         if (req->op == X_WRITE) {
1778                 char old_object_name[MAX_OBJECT_LEN + 1];
1779                 uint32_t old_objectlen;
1780
1781                 char *target = xseg_get_target(peer->xseg, req);
1782                 (void)target;
1783                 //assert mn->flags & MF_OBJECT_WRITING
1784                 mn->flags &= ~MF_OBJECT_WRITING;
1785                 strncpy(old_object_name, mn->object, mn->objectlen);
1786                 old_objectlen = mn->objectlen;
1787
1788                 struct map_node tmp;
1789                 char *data = xseg_get_data(peer->xseg, req);
1790                 map_to_object(&tmp, (unsigned char *) data);
1791                 mn->flags &= ~MF_OBJECT_EXIST;
1792
1793                 strncpy(mn->object, tmp.object, tmp.objectlen);
1794                 mn->object[tmp.objectlen] = 0;
1795                 mn->objectlen = tmp.objectlen;
1796                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1797                 //signal_mapnode since Snapshot was successfull
1798                 signal_mapnode(mn);
1799
1800                 //do delete old object
1801                 strncpy(tmp.object, old_object_name, old_objectlen);
1802                 tmp.object[old_objectlen] = 0;
1803                 tmp.objectlen = old_objectlen;
1804                 tmp.flags = MF_OBJECT_EXIST;
1805                 struct xseg_request *xreq = __delete_object(pr, &tmp);
1806                 if (!xreq){
1807                         //just a warning. Snapshot was successfull
1808                         XSEGLOG2(&lc, W, "Cannot delete old object %s", tmp.object);
1809                         goto out_ok;
1810                 }
1811                 //overwrite copyup node, since tmp is a stack dummy variable
1812                 __set_copyup_node (mio, xreq, mn);
1813                 XSEGLOG2(&lc, I, "Deletion of %s pending", tmp.object);
1814         } else if (req->op == X_SNAPSHOT) {
1815                 //issue write_object;
1816                 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1817                 struct map *map = mn->map;
1818                 if (!map){
1819                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1820                         goto out_err;
1821                 }
1822
1823                 /* construct a tmp map_node for writing purposes */
1824                 //char *target = xseg_get_target(peer->xseg, req);
1825                 struct map_node newmn = *mn;
1826                 newmn.flags = 0;
1827                 struct xseg_reply_snapshot *xreply;
1828                 xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, req);
1829                 //assert xreply->targetlen !=0
1830                 //assert xreply->targetlen < XSEG_MAX_TARGETLEN
1831                 //xreply->target[xreply->targetlen] = 0;
1832                 //assert xreply->target valid
1833                 strncpy(newmn.object, xreply->target, xreply->targetlen);
1834                 newmn.object[req->targetlen] = 0;
1835                 newmn.objectlen = req->targetlen;
1836                 newmn.objectidx = mn->objectidx;
1837                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1838                 if (!xreq){
1839                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1840                                         "\n\t of map %s [%llu]",
1841                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1842                         goto out_err;
1843                 }
1844                 mn->flags |= MF_OBJECT_WRITING;
1845                 __set_copyup_node (mio, xreq, mn);
1846
1847                 XSEGLOG2(&lc, I, "Object %s snapshot completed. Pending writing.", mn->object);
1848         } else if (req->op == X_DELETE){
1849                 //deletion of the old block completed
1850                 XSEGLOG2(&lc, I, "Deletion of completed");
1851                 goto out_ok;
1852                 ;
1853         } else {
1854                 //wtf??
1855                 ;
1856         }
1857
1858 out:
1859         xseg_put_request(peer->xseg, req, pr->portno);
1860         return;
1861
1862 out_err:
1863         mio->snap_pending--;
1864         XSEGLOG2(&lc, D, "Mio->snap_pending: %u", mio->snap_pending);
1865         mio->err = 1;
1866         if (mn)
1867                 signal_mapnode(mn);
1868         signal_pr(pr);
1869         goto out;
1870
1871 out_ok:
1872         mio->snap_pending--;
1873         signal_pr(pr);
1874         goto out;
1875
1876
1877 }
1878 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1879 {
1880         struct peerd *peer = pr->peer;
1881         struct mapperd *mapper = __get_mapperd(peer);
1882         (void)mapper;
1883         struct mapper_io *mio = __get_mapper_io(pr);
1884         struct map_node *mn = __get_copyup_node(mio, req);
1885         if (!mn){
1886                 XSEGLOG2(&lc, E, "Cannot get map node");
1887                 goto out_err;
1888         }
1889         __set_copyup_node(mio, req, NULL);
1890
1891         if (req->state & XS_FAILED){
1892                 XSEGLOG2(&lc, E, "Req failed");
1893                 mn->flags &= ~MF_OBJECT_COPYING;
1894                 mn->flags &= ~MF_OBJECT_WRITING;
1895                 goto out_err;
1896         }
1897         if (req->op == X_WRITE) {
1898                 char *target = xseg_get_target(peer->xseg, req);
1899                 (void)target;
1900                 //printf("handle object write replyi\n");
1901                 __set_copyup_node(mio, req, NULL);
1902                 //assert mn->flags & MF_OBJECT_WRITING
1903                 mn->flags &= ~MF_OBJECT_WRITING;
1904
1905                 struct map_node tmp;
1906                 char *data = xseg_get_data(peer->xseg, req);
1907                 map_to_object(&tmp, (unsigned char *) data);
1908                 mn->flags |= MF_OBJECT_EXIST;
1909                 if (mn->flags != MF_OBJECT_EXIST){
1910                         XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1911                         goto out_err;
1912                 }
1913                 //assert mn->flags & MF_OBJECT_EXIST
1914                 strncpy(mn->object, tmp.object, tmp.objectlen);
1915                 mn->object[tmp.objectlen] = 0;
1916                 mn->objectlen = tmp.objectlen;
1917                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1918                 mio->copyups--;
1919                 signal_mapnode(mn);
1920                 signal_pr(pr);
1921         } else if (req->op == X_COPY) {
1922         //      issue write_object;
1923                 mn->flags &= ~MF_OBJECT_COPYING;
1924                 struct map *map = mn->map;
1925                 if (!map){
1926                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1927                         goto out_err;
1928                 }
1929
1930                 /* construct a tmp map_node for writing purposes */
1931                 char *target = xseg_get_target(peer->xseg, req);
1932                 struct map_node newmn = *mn;
1933                 newmn.flags = MF_OBJECT_EXIST;
1934                 strncpy(newmn.object, target, req->targetlen);
1935                 newmn.object[req->targetlen] = 0;
1936                 newmn.objectlen = req->targetlen;
1937                 newmn.objectidx = mn->objectidx; 
1938                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1939                 if (!xreq){
1940                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1941                                         "\n\t of map %s [%llu]",
1942                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1943                         goto out_err;
1944                 }
1945                 mn->flags |= MF_OBJECT_WRITING;
1946                 __set_copyup_node (mio, xreq, mn);
1947
1948                 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1949         } else {
1950                 //wtf??
1951                 ;
1952         }
1953
1954 out:
1955         xseg_put_request(peer->xseg, req, pr->portno);
1956         return;
1957
1958 out_err:
1959         mio->copyups--;
1960         XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1961         mio->err = 1;
1962         if (mn)
1963                 signal_mapnode(mn);
1964         signal_pr(pr);
1965         goto out;
1966
1967 }
1968
1969 struct r2o {
1970         struct map_node *mn;
1971         uint64_t offset;
1972         uint64_t size;
1973 };
1974
1975 static int req2objs(struct peer_req *pr, struct map *map, int write)
1976 {
1977         int r = 0;
1978         struct peerd *peer = pr->peer;
1979         struct mapper_io *mio = __get_mapper_io(pr);
1980         char *target = xseg_get_target(peer->xseg, pr->req);
1981         uint32_t nr_objs = calc_nr_obj(pr->req);
1982         uint64_t size = sizeof(struct xseg_reply_map) + 
1983                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1984         uint32_t idx, i;
1985         uint64_t rem_size, obj_index, obj_offset, obj_size; 
1986         struct map_node *mn;
1987         mio->copyups = 0;
1988         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1989
1990         /* get map_nodes of request */
1991         struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1992         if (!mns){
1993                 XSEGLOG2(&lc, E, "Cannot allocate mns");
1994                 return -1;
1995         }
1996         idx = 0;
1997         rem_size = pr->req->size;
1998         obj_index = pr->req->offset / block_size;
1999         obj_offset = pr->req->offset & (block_size -1); //modulo
2000         obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
2001         mn = get_mapnode(map, obj_index);
2002         if (!mn) {
2003                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
2004                 r = -1;
2005                 goto out;
2006         }
2007         mns[idx].mn = mn;
2008         mns[idx].offset = obj_offset;
2009         mns[idx].size = obj_size;
2010         rem_size -= obj_size;
2011         while (rem_size > 0) {
2012                 idx++;
2013                 obj_index++;
2014                 obj_offset = 0;
2015                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
2016                 rem_size -= obj_size;
2017                 mn = get_mapnode(map, obj_index);
2018                 if (!mn) {
2019                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
2020                         r = -1;
2021                         goto out;
2022                 }
2023                 mns[idx].mn = mn;
2024                 mns[idx].offset = obj_offset;
2025                 mns[idx].size = obj_size;
2026         }
2027         if (write) {
2028                 int can_wait = 0;
2029                 mio->cb=copyup_cb;
2030                 /* do a first scan and issue as many copyups as we can.
2031                  * then retry and wait when an object is not ready.
2032                  * this could be done better, since now we wait also on the
2033                  * pending copyups
2034                  */
2035                 int j;
2036                 for (j = 0; j < 2 && !mio->err; j++) {
2037                         for (i = 0; i < (idx+1); i++) {
2038                                 mn = mns[i].mn;
2039                                 //do copyups
2040                                 if (mn->flags & MF_OBJECT_NOT_READY){
2041                                         if (!can_wait)
2042                                                 continue;
2043                                         if (mn->flags & MF_OBJECT_NOT_READY)
2044                                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2045                                         if (mn->flags & MF_OBJECT_DESTROYED){
2046                                                 mio->err = 1;
2047                                                 continue;
2048                                         }
2049                                 }
2050
2051                                 if (!(mn->flags & MF_OBJECT_EXIST)) {
2052                                         //calc new_target, copy up object
2053                                         if (copyup_object(peer, mn, pr) == NULL){
2054                                                 XSEGLOG2(&lc, E, "Error in copy up object");
2055                                                 mio->err = 1;
2056                                         } else {
2057                                                 mio->copyups++;
2058                                         }
2059                                 }
2060
2061                                 if (mio->err){
2062                                         XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
2063                                         break;
2064                                 }
2065                         }
2066                         can_wait = 1;
2067                 }
2068                 if (mio->copyups > 0)
2069                         wait_on_pr(pr, mio->copyups > 0);
2070         }
2071
2072         if (mio->err){
2073                 r = -1;
2074                 XSEGLOG2(&lc, E, "Mio->err");
2075                 goto out;
2076         }
2077
2078         /* resize request to fit reply */
2079         char buf[XSEG_MAX_TARGETLEN];
2080         strncpy(buf, target, pr->req->targetlen);
2081         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
2082         if (r < 0) {
2083                 XSEGLOG2(&lc, E, "Cannot resize request");
2084                 goto out;
2085         }
2086         target = xseg_get_target(peer->xseg, pr->req);
2087         strncpy(target, buf, pr->req->targetlen);
2088
2089         /* structure reply */
2090         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2091         reply->cnt = nr_objs;
2092         for (i = 0; i < (idx+1); i++) {
2093                 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
2094                 reply->segs[i].targetlen = mns[i].mn->objectlen;
2095                 reply->segs[i].offset = mns[i].offset;
2096                 reply->segs[i].size = mns[i].size;
2097         }
2098 out:
2099         for (i = 0; i < idx; i++) {
2100                 put_mapnode(mns[i].mn);
2101         }
2102         free(mns);
2103         mio->cb = NULL;
2104         return r;
2105 }
2106
2107 static int do_dropcache(struct peer_req *pr, struct map *map)
2108 {
2109         struct map_node *mn;
2110         struct peerd *peer = pr->peer;
2111         struct mapperd *mapper = __get_mapperd(peer);
2112         uint64_t i;
2113         XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
2114         map->flags |= MF_MAP_DROPPING_CACHE;
2115         for (i = 0; i < calc_map_obj(map); i++) {
2116                 mn = get_mapnode(map, i);
2117                 if (mn) {
2118                         if (!(mn->flags & MF_OBJECT_DESTROYED)){
2119                                 //make sure all pending operations on all objects are completed
2120                                 if (mn->flags & MF_OBJECT_NOT_READY)
2121                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2122                                 mn->flags |= MF_OBJECT_DESTROYED;
2123                         }
2124                         put_mapnode(mn);
2125                 }
2126         }
2127         map->flags &= ~MF_MAP_DROPPING_CACHE;
2128         map->flags |= MF_MAP_DESTROYED;
2129         remove_map(mapper, map);
2130         XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
2131         put_map(map);   // put map here to destroy it (matches m->ref = 1 on map create)
2132         return 0;
2133 }
2134
2135 static int do_info(struct peer_req *pr, struct map *map)
2136 {
2137         struct peerd *peer = pr->peer;
2138         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
2139         xinfo->size = map->size;
2140         return 0;
2141 }
2142
2143
2144 static int do_open(struct peer_req *pr, struct map *map)
2145 {
2146         if (map->flags & MF_MAP_EXCLUSIVE){
2147                 return 0;
2148         }
2149         else {
2150                 return -1;
2151         }
2152 }
2153
2154 static int do_close(struct peer_req *pr, struct map *map)
2155 {
2156         if (map->flags & MF_MAP_EXCLUSIVE){
2157                 /* do not drop cache if close failed and map not deleted */
2158                 if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
2159                         return -1;
2160         }
2161         return do_dropcache(pr, map);
2162 }
2163
2164 static int do_snapshot(struct peer_req *pr, struct map *map)
2165 {
2166         uint64_t i;
2167         struct peerd *peer = pr->peer;
2168         struct mapper_io *mio = __get_mapper_io(pr);
2169         struct map_node *mn;
2170         struct xseg_request *req;
2171
2172         if (!(map->flags & MF_MAP_EXCLUSIVE)){
2173                 XSEGLOG2(&lc, E, "Map was not opened exclusively");
2174                 return -1;
2175         }
2176         XSEGLOG2(&lc, I, "Starting snapshot for map %s", map->volume);
2177         map->flags |= MF_MAP_SNAPSHOTTING;
2178
2179         uint64_t nr_obj = calc_map_obj(map);
2180         mio->cb = snapshot_cb;
2181         mio->snap_pending = 0;
2182         mio->err = 0;
2183         for (i = 0; i < nr_obj; i++){
2184
2185                 /* throttle pending snapshots
2186                  * this should be nr_ops of the blocker, but since we don't know
2187                  * that, we assume based on our own nr_ops
2188                  */
2189                 if (mio->snap_pending >= peer->nr_ops)
2190                         wait_on_pr(pr, mio->snap_pending >= peer->nr_ops);
2191
2192                 mn = get_mapnode(map, i);
2193                 if (!mn)
2194                         //warning?
2195                         continue;
2196                 if (!(mn->flags & MF_OBJECT_EXIST)){
2197                         put_mapnode(mn);
2198                         continue;
2199                 }
2200                 // make sure all pending operations on all objects are completed
2201                 if (mn->flags & MF_OBJECT_NOT_READY)
2202                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2203
2204                 /* TODO will this ever happen?? */
2205                 if (mn->flags & MF_OBJECT_DESTROYED){
2206                         put_mapnode(mn);
2207                         continue;
2208                 }
2209
2210                 req = __snapshot_object(pr, mn);
2211                 if (!req){
2212                         mio->err = 1;
2213                         put_mapnode(mn);
2214                         break;
2215                 }
2216                 mio->snap_pending++;
2217                 /* do not put_mapnode here. cb does that */
2218         }
2219
2220         if (mio->snap_pending > 0)
2221                 wait_on_pr(pr, mio->snap_pending > 0);
2222         mio->cb = NULL;
2223
2224         if (mio->err)
2225                 goto out_err;
2226
2227         /* calculate name of snapshot */
2228         struct map tmp_map = *map;
2229         unsigned char sha[SHA256_DIGEST_SIZE];
2230         unsigned char *buf = malloc(block_size);
2231         char newvolumename[MAX_VOLUME_LEN];
2232         uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
2233         uint64_t pos = 0;
2234         uint64_t max_objidx = calc_map_obj(map);
2235         int r;
2236
2237         for (i = 0; i < max_objidx; i++) {
2238                 mn = find_object(map, i);
2239                 if (!mn){
2240                         XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
2241                                         (unsigned long long) i, map->volume);
2242                         goto out_err;
2243                 }
2244                 v0_object_to_map(mn, buf+pos);
2245                 pos += v0_objectsize_in_map;
2246         }
2247 //      SHA256(buf, pos, sha);
2248         merkle_hash(buf, pos, sha);
2249         hexlify(sha, newvolumename);
2250         strncpy(tmp_map.volume, newvolumename, newvolumenamelen);
2251         tmp_map.volumelen = newvolumenamelen;
2252         free(buf);
2253         tmp_map.version = 0; // set volume version to pithos image
2254
2255         /* write the map of the Snapshot */
2256         r = write_map(pr, &tmp_map);
2257         if (r < 0)
2258                 goto out_err;
2259         char targetbuf[XSEG_MAX_TARGETLEN];
2260         char *target = xseg_get_target(peer->xseg, pr->req);
2261         strncpy(targetbuf, target, pr->req->targetlen);
2262         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
2263                         sizeof(struct xseg_reply_snapshot));
2264         if (r < 0){
2265                 XSEGLOG2(&lc, E, "Cannot resize request");
2266                 goto out_err;
2267         }
2268         target = xseg_get_target(peer->xseg, pr->req);
2269         strncpy(target, targetbuf, pr->req->targetlen);
2270
2271         struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *)
2272                                                 xseg_get_data(peer->xseg, pr->req);
2273         strncpy(xreply->target, newvolumename, newvolumenamelen);
2274         xreply->targetlen = newvolumenamelen;
2275         map->flags &= ~MF_MAP_SNAPSHOTTING;
2276         XSEGLOG2(&lc, I, "Snapshot for map %s completed", map->volume);
2277         return 0;
2278
2279 out_err:
2280         map->flags &= ~MF_MAP_SNAPSHOTTING;
2281         XSEGLOG2(&lc, E, "Snapshot for map %s failed", map->volume);
2282         return -1;
2283 }
2284
2285
2286 static int do_destroy(struct peer_req *pr, struct map *map)
2287 {
2288         uint64_t i;
2289         struct peerd *peer = pr->peer;
2290         struct mapper_io *mio = __get_mapper_io(pr);
2291         struct map_node *mn;
2292         struct xseg_request *req;
2293
2294         if (!(map->flags & MF_MAP_EXCLUSIVE))
2295                 return -1;
2296
2297         XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
2298         req = __delete_map(pr, map);
2299         if (!req)
2300                 return -1;
2301         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
2302         if (req->state & XS_FAILED){
2303                 xseg_put_request(peer->xseg, req, pr->portno);
2304                 map->flags &= ~MF_MAP_DELETING;
2305                 return -1;
2306         }
2307         xseg_put_request(peer->xseg, req, pr->portno);
2308
2309         uint64_t nr_obj = calc_map_obj(map);
2310         mio->cb = deletion_cb;
2311         mio->del_pending = 0;
2312         mio->err = 0;
2313         for (i = 0; i < nr_obj; i++){
2314
2315                 /* throttle pending deletions
2316                  * this should be nr_ops of the blocker, but since we don't know
2317                  * that, we assume based on our own nr_ops
2318                  */
2319                 if (mio->del_pending >= peer->nr_ops)
2320                         wait_on_pr(pr, mio->del_pending >= peer->nr_ops);
2321
2322                 mn = get_mapnode(map, i);
2323                 if (!mn)
2324                         continue;
2325                 if (mn->flags & MF_OBJECT_DESTROYED){
2326                         put_mapnode(mn);
2327                         continue;
2328                 }
2329                 if (!(mn->flags & MF_OBJECT_EXIST)){
2330                         mn->flags |= MF_OBJECT_DESTROYED;
2331                         put_mapnode(mn);
2332                         continue;
2333                 }
2334
2335                 // make sure all pending operations on all objects are completed
2336                 if (mn->flags & MF_OBJECT_NOT_READY)
2337                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2338
2339                 req = __delete_object(pr, mn);
2340                 if (!req){
2341                         mio->err = 1;
2342                         put_mapnode(mn);
2343                         continue;
2344                 }
2345                 mio->del_pending++;
2346                 /* do not put_mapnode here. cb does that */
2347         }
2348
2349         if (mio->del_pending > 0)
2350                 wait_on_pr(pr, mio->del_pending > 0);
2351
2352         mio->cb = NULL;
2353         map->flags &= ~MF_MAP_DELETING;
2354         map->flags |= MF_MAP_DELETED;
2355         XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
2356         return do_close(pr, map);
2357 }
2358
2359 static int do_mapr(struct peer_req *pr, struct map *map)
2360 {
2361         struct peerd *peer = pr->peer;
2362         int r = req2objs(pr, map, 0);
2363         if  (r < 0){
2364                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
2365                                 map->volume, 
2366                                 (unsigned long long) pr->req->offset, 
2367                                 (unsigned long long) (pr->req->offset + pr->req->size));
2368                 return -1;
2369         }
2370         XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
2371                         map->volume, 
2372                         (unsigned long long) pr->req->offset, 
2373                         (unsigned long long) (pr->req->offset + pr->req->size));
2374         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2375                         (unsigned long long) pr->req->offset,
2376                         (unsigned long long) pr->req->size);
2377         char buf[XSEG_MAX_TARGETLEN+1];
2378         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2379         int i;
2380         for (i = 0; i < reply->cnt; i++) {
2381                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2382                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2383                 buf[reply->segs[i].targetlen] = 0;
2384                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2385                                 (unsigned long long) reply->segs[i].offset,
2386                                 (unsigned long long) reply->segs[i].size);
2387         }
2388         return 0;
2389 }
2390
2391 static int do_mapw(struct peer_req *pr, struct map *map)
2392 {
2393         struct peerd *peer = pr->peer;
2394         int r = req2objs(pr, map, 1);
2395         if  (r < 0){
2396                 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
2397                                 map->volume, 
2398                                 (unsigned long long) pr->req->offset, 
2399                                 (unsigned long long) (pr->req->offset + pr->req->size));
2400                 return -1;
2401         }
2402         XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
2403                         map->volume, 
2404                         (unsigned long long) pr->req->offset, 
2405                         (unsigned long long) (pr->req->offset + pr->req->size));
2406         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2407                         (unsigned long long) pr->req->offset,
2408                         (unsigned long long) pr->req->size);
2409         char buf[XSEG_MAX_TARGETLEN+1];
2410         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2411         int i;
2412         for (i = 0; i < reply->cnt; i++) {
2413                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2414                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2415                 buf[reply->segs[i].targetlen] = 0;
2416                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2417                                 (unsigned long long) reply->segs[i].offset,
2418                                 (unsigned long long) reply->segs[i].size);
2419         }
2420         return 0;
2421 }
2422
2423 //here map is the parent map
2424 static int do_clone(struct peer_req *pr, struct map *map)
2425 {
2426         int r;
2427         struct peerd *peer = pr->peer;
2428         struct mapperd *mapper = __get_mapperd(peer);
2429         char *target = xseg_get_target(peer->xseg, pr->req);
2430         struct map *clonemap;
2431         struct xseg_request_clone *xclone =
2432                 (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2433
2434         XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
2435
2436         clonemap = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2437         if (!clonemap)
2438                 return -1;
2439
2440         /* open map to get exclusive access to map */
2441         r = open_map(pr, clonemap, 0);
2442         if (r < 0){
2443                 XSEGLOG2(&lc, E, "Cannot open map %s", clonemap->volume);
2444                 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2445                 goto out_err;
2446         }
2447         r = load_map(pr, clonemap);
2448         if (r >= 0) {
2449                 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2450                 goto out_err;
2451         }
2452
2453         if (xclone->size == -1)
2454                 clonemap->size = map->size;
2455         else
2456                 clonemap->size = xclone->size;
2457         if (clonemap->size < map->size){
2458                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
2459                                 "\n\t for requested clone %s",
2460                                 (unsigned long long) xclone->size,
2461                                 (unsigned long long) map->size, clonemap->volume);
2462                 goto out_err;
2463         }
2464         if (clonemap->size > MAX_VOLUME_SIZE) {
2465                 XSEGLOG2(&lc, E, "Requested size %llu > max volume size %llu"
2466                                 "\n\t for volume %s",
2467                                 clonemap->size, MAX_VOLUME_SIZE, clonemap->volume);
2468                 goto out_err;
2469         }
2470
2471         //alloc and init map_nodes
2472         //unsigned long c = clonemap->size/block_size + 1;
2473         unsigned long c = calc_map_obj(clonemap);
2474         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
2475         if (!map_nodes){
2476                 goto out_err;
2477         }
2478         int i;
2479         //for (i = 0; i < clonemap->size/block_size + 1; i++) {
2480         for (i = 0; i < c; i++) {
2481                 struct map_node *mn = get_mapnode(map, i);
2482                 if (mn) {
2483                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
2484                         map_nodes[i].objectlen = mn->objectlen;
2485                         put_mapnode(mn);
2486                 } else {
2487                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2488                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2489                 }
2490                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2491                 map_nodes[i].flags = 0;
2492                 map_nodes[i].objectidx = i;
2493                 map_nodes[i].map = clonemap;
2494                 map_nodes[i].ref = 1;
2495                 map_nodes[i].waiters = 0;
2496                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2497                 r = insert_object(clonemap, &map_nodes[i]);
2498                 if (r < 0){
2499                         XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
2500                         goto out_err;
2501                 }
2502         }
2503
2504         r = write_map(pr, clonemap);
2505         if (r < 0){
2506                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
2507                 goto out_err;
2508         }
2509         do_close(pr, clonemap);
2510         return 0;
2511
2512 out_err:
2513         do_close(pr, clonemap);
2514         return -1;
2515 }
2516
2517 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
2518 {
2519         int r, opened = 0;
2520         if (flags & MF_EXCLUSIVE){
2521                 r = open_map(pr, map, flags);
2522                 if (r < 0) {
2523                         if (flags & MF_FORCE){
2524                                 return -1;
2525                         }
2526                 } else {
2527                         opened = 1;
2528                 }
2529         }
2530         r = load_map(pr, map);
2531         if (r < 0 && opened){
2532                 close_map(pr, map);
2533         }
2534         return r;
2535 }
2536
2537 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
2538                         uint32_t flags)
2539 {
2540         int r;
2541         struct peerd *peer = pr->peer;
2542         struct mapperd *mapper = __get_mapperd(peer);
2543         struct map *map = find_map_len(mapper, name, namelen, flags);
2544         if (!map){
2545                 if (flags & MF_LOAD){
2546                         map = create_map(mapper, name, namelen, flags);
2547                         if (!map)
2548                                 return NULL;
2549                         __get_map(map);
2550                         r = open_load_map(pr, map, flags);
2551                         if (r < 0){
2552                                 do_dropcache(pr, map);
2553                                 /* signal map here, so any other threads that
2554                                  * tried to get the map, but couldn't because
2555                                  * of the opening or loading operation that
2556                                  * failed, can continue.
2557                                  */
2558                                 signal_map(map);
2559                                 put_map(map);
2560                                 return NULL;
2561                         }
2562                         return map;
2563                 } else {
2564                         return NULL;
2565                 }
2566         } else if (map->flags & MF_MAP_DESTROYED){
2567                 return NULL;
2568         } else {
2569                 __get_map(map);
2570         }
2571         return map;
2572
2573 }
2574
2575 static int map_action(int (action)(struct peer_req *pr, struct map *map),
2576                 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
2577 {
2578         //struct peerd *peer = pr->peer;
2579         struct map *map;
2580 start:
2581         map = get_map(pr, name, namelen, flags);
2582         if (!map)
2583                 return -1;
2584         if (map->flags & MF_MAP_NOT_READY){
2585                 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
2586                 put_map(map);
2587                 goto start;
2588         }
2589         int r = action(pr, map);
2590         //always drop cache if map not read exclusively
2591         if (!(map->flags & MF_MAP_EXCLUSIVE))
2592                 do_dropcache(pr, map);
2593         signal_map(map);
2594         put_map(map);
2595         return r;
2596 }
2597
2598 void * handle_info(struct peer_req *pr)
2599 {
2600         struct peerd *peer = pr->peer;
2601         char *target = xseg_get_target(peer->xseg, pr->req);
2602         int r = map_action(do_info, pr, target, pr->req->targetlen,
2603                                 MF_ARCHIP|MF_LOAD);
2604         if (r < 0)
2605                 fail(peer, pr);
2606         else
2607                 complete(peer, pr);
2608         ta--;
2609         return NULL;
2610 }
2611
2612 void * handle_clone(struct peer_req *pr)
2613 {
2614         int r;
2615         struct peerd *peer = pr->peer;
2616         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2617         if (!xclone) {
2618                 r = -1;
2619                 goto out;
2620         }
2621
2622         if (xclone->targetlen){
2623                 /* if snap was defined */
2624                 //support clone only from pithos
2625                 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
2626                                         MF_LOAD);
2627         } else {
2628                 /* else try to create a new volume */
2629                 XSEGLOG2(&lc, I, "Creating volume");
2630                 if (!xclone->size){
2631                         XSEGLOG2(&lc, E, "Cannot create volume. Size not specified");
2632                         r = -1;
2633                         goto out;
2634                 }
2635                 if (xclone->size > MAX_VOLUME_SIZE) {
2636                         XSEGLOG2(&lc, E, "Requested size %llu > max volume "
2637                                         "size %llu", xclone->size, MAX_VOLUME_SIZE);
2638                         r = -1;
2639                         goto out;
2640                 }
2641
2642                 struct map *map;
2643                 char *target = xseg_get_target(peer->xseg, pr->req);
2644
2645                 //create a new empty map of size
2646                 map = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2647                 if (!map){
2648                         r = -1;
2649                         goto out;
2650                 }
2651                 /* open map to get exclusive access to map */
2652                 r = open_map(pr, map, 0);
2653                 if (r < 0){
2654                         XSEGLOG2(&lc, E, "Cannot open map %s", map->volume);
2655                         XSEGLOG2(&lc, E, "Target volume %s exists", map->volume);
2656                         do_dropcache(pr, map);
2657                         r = -1;
2658                         goto out;
2659                 }
2660                 r = load_map(pr, map);
2661                 if (r >= 0) {
2662                         XSEGLOG2(&lc, E, "Map exists %s", map->volume);
2663                         do_close(pr, map);
2664                         r = -1;
2665                         goto out;
2666                 }
2667                 map->size = xclone->size;
2668                 //populate_map with zero objects;
2669                 uint64_t nr_objs = xclone->size / block_size;
2670                 if (xclone->size % block_size)
2671                         nr_objs++;
2672
2673                 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
2674                 if (!map_nodes){
2675                         do_close(pr, map);
2676                         r = -1;
2677                         goto out;
2678                 }
2679
2680                 uint64_t i;
2681                 for (i = 0; i < nr_objs; i++) {
2682                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2683                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2684                         map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2685                         map_nodes[i].flags = 0;
2686                         map_nodes[i].objectidx = i;
2687                         map_nodes[i].map = map;
2688                         map_nodes[i].ref = 1;
2689                         map_nodes[i].waiters = 0;
2690                         map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2691                         r = insert_object(map, &map_nodes[i]);
2692                         if (r < 0){
2693                                 do_close(pr, map);
2694                                 r = -1;
2695                                 goto out;
2696                         }
2697                 }
2698                 r = write_map(pr, map);
2699                 if (r < 0){
2700                         XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
2701                         do_close(pr, map);
2702                         goto out;
2703                 }
2704                 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
2705                 r = 0;
2706                 do_close(pr, map); //drop cache here for consistency
2707         }
2708 out:
2709         if (r < 0)
2710                 fail(peer, pr);
2711         else
2712                 complete(peer, pr);
2713         ta--;
2714         return NULL;
2715 }
2716
2717 void * handle_mapr(struct peer_req *pr)
2718 {
2719         struct peerd *peer = pr->peer;
2720         char *target = xseg_get_target(peer->xseg, pr->req);
2721         int r = map_action(do_mapr, pr, target, pr->req->targetlen,
2722                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2723         if (r < 0)
2724                 fail(peer, pr);
2725         else
2726                 complete(peer, pr);
2727         ta--;
2728         return NULL;
2729 }
2730
2731 void * handle_mapw(struct peer_req *pr)
2732 {
2733         struct peerd *peer = pr->peer;
2734         char *target = xseg_get_target(peer->xseg, pr->req);
2735         int r = map_action(do_mapw, pr, target, pr->req->targetlen,
2736                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2737         if (r < 0)
2738                 fail(peer, pr);
2739         else
2740                 complete(peer, pr);
2741         XSEGLOG2(&lc, D, "Ta: %d", ta);
2742         ta--;
2743         return NULL;
2744 }
2745
2746 void * handle_destroy(struct peer_req *pr)
2747 {
2748         struct peerd *peer = pr->peer;
2749         char *target = xseg_get_target(peer->xseg, pr->req);
2750         /* request EXCLUSIVE access, but do not force it.
2751          * check if succeeded on do_destroy
2752          */
2753         int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2754                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2755         if (r < 0)
2756                 fail(peer, pr);
2757         else
2758                 complete(peer, pr);
2759         ta--;
2760         return NULL;
2761 }
2762
2763 void * handle_open(struct peer_req *pr)
2764 {
2765         struct peerd *peer = pr->peer;
2766         char *target = xseg_get_target(peer->xseg, pr->req);
2767         //here we do not want to load
2768         int r = map_action(do_open, pr, target, pr->req->targetlen,
2769                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2770         if (r < 0)
2771                 fail(peer, pr);
2772         else
2773                 complete(peer, pr);
2774         ta--;
2775         return NULL;
2776 }
2777
2778 void * handle_close(struct peer_req *pr)
2779 {
2780         struct peerd *peer = pr->peer;
2781         char *target = xseg_get_target(peer->xseg, pr->req);
2782         //here we do not want to load
2783         int r = map_action(do_close, pr, target, pr->req->targetlen,
2784                                 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2785         if (r < 0)
2786                 fail(peer, pr);
2787         else
2788                 complete(peer, pr);
2789         ta--;
2790         return NULL;
2791 }
2792
2793 void * handle_snapshot(struct peer_req *pr)
2794 {
2795         struct peerd *peer = pr->peer;
2796         char *target = xseg_get_target(peer->xseg, pr->req);
2797         /* request EXCLUSIVE access, but do not force it.
2798          * check if succeeded on do_snapshot
2799          */
2800         int r = map_action(do_snapshot, pr, target, pr->req->targetlen,
2801                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2802         if (r < 0)
2803                 fail(peer, pr);
2804         else
2805                 complete(peer, pr);
2806         ta--;
2807         return NULL;
2808 }
2809
2810 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
2811                         struct xseg_request *req)
2812 {
2813         //struct mapperd *mapper = __get_mapperd(peer);
2814         struct mapper_io *mio = __get_mapper_io(pr);
2815         void *(*action)(struct peer_req *) = NULL;
2816
2817         mio->state = ACCEPTED;
2818         mio->err = 0;
2819         mio->cb = NULL;
2820         switch (pr->req->op) {
2821                 /* primary xseg operations of mapper */
2822                 case X_CLONE: action = handle_clone; break;
2823                 case X_MAPR: action = handle_mapr; break;
2824                 case X_MAPW: action = handle_mapw; break;
2825                 case X_SNAPSHOT: action = handle_snapshot; break;
2826                 case X_INFO: action = handle_info; break;
2827                 case X_DELETE: action = handle_destroy; break;
2828                 case X_OPEN: action = handle_open; break;
2829                 case X_CLOSE: action = handle_close; break;
2830                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2831         }
2832         if (action){
2833                 ta++;
2834                 mio->active = 1;
2835                 st_thread_create(action, pr, 0, 0);
2836         }
2837         return 0;
2838
2839 }
2840
2841 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2842                 enum dispatch_reason reason)
2843 {
2844         struct mapperd *mapper = __get_mapperd(peer);
2845         (void) mapper;
2846         struct mapper_io *mio = __get_mapper_io(pr);
2847         (void) mio;
2848
2849
2850         if (reason == dispatch_accept)
2851                 dispatch_accepted(peer, pr, req);
2852         else {
2853                 if (mio->cb){
2854                         mio->cb(pr, req);
2855                 } else { 
2856                         signal_pr(pr);
2857                 }
2858         }
2859         return 0;
2860 }
2861
2862 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2863 {
2864         int i;
2865
2866         //FIXME error checks
2867         struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2868         peer->priv = mapperd;
2869         mapper = mapperd;
2870         mapper->hashmaps = xhash_new(3, STRING);
2871
2872         for (i = 0; i < peer->nr_ops; i++) {
2873                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2874                 mio->copyups_nodes = xhash_new(3, INTEGER);
2875                 mio->copyups = 0;
2876                 mio->err = 0;
2877                 mio->active = 0;
2878                 peer->peer_reqs[i].priv = mio;
2879         }
2880
2881         mapper->bportno = -1;
2882         mapper->mbportno = -1;
2883         BEGIN_READ_ARGS(argc, argv);
2884         READ_ARG_ULONG("-bp", mapper->bportno);
2885         READ_ARG_ULONG("-mbp", mapper->mbportno);
2886         END_READ_ARGS();
2887         if (mapper->bportno == -1){
2888                 XSEGLOG2(&lc, E, "Portno for blocker must be provided");
2889                 usage(argv[0]);
2890                 return -1;
2891         }
2892         if (mapper->mbportno == -1){
2893                 XSEGLOG2(&lc, E, "Portno for mblocker must be provided");
2894                 usage(argv[0]);
2895                 return -1;
2896         }
2897
2898         const struct sched_param param = { .sched_priority = 99 };
2899         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
2900         /* FIXME maybe place it in peer
2901          * should be done for each port (sportno to eportno)
2902          */
2903         xseg_set_max_requests(peer->xseg, peer->portno_start, 5000);
2904         xseg_set_freequeue_size(peer->xseg, peer->portno_start, 3000, 0);
2905
2906
2907 //      test_map(peer);
2908
2909         return 0;
2910 }
2911
2912 /* FIXME this should not be here */
2913 int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
2914 {
2915         struct xseg *xseg = peer->xseg;
2916         xport portno_start = peer->portno_start;
2917         xport portno_end = peer->portno_end;
2918         struct peer_req *pr;
2919         xport i;
2920         int  r, c = 0;
2921         struct xseg_request *received;
2922         xseg_prepare_wait(xseg, portno_start);
2923         while(1) {
2924                 XSEGLOG2(&lc, D, "Attempting to check for reply");
2925                 c = 1;
2926                 while (c){
2927                         c = 0;
2928                         for (i = portno_start; i <= portno_end; i++) {
2929                                 received = xseg_receive(xseg, i, 0);
2930                                 if (received) {
2931                                         c = 1;
2932                                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
2933                                         if (r < 0 || !pr || received != expected_req){
2934                                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
2935                                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
2936                                                 if (p == NoPort){
2937                                                         XSEGLOG2(&lc, W, "Could not respond stale request");
2938                                                         xseg_put_request(xseg, received, portno_start);
2939                                                         continue;
2940                                                 } else {
2941                                                         xseg_signal(xseg, p);
2942                                                 }
2943                                         } else {
2944                                                 xseg_cancel_wait(xseg, portno_start);
2945                                                 return 0;
2946                                         }
2947                                 }
2948                         }
2949                 }
2950                 xseg_wait_signal(xseg, 1000000UL);
2951         }
2952 }
2953
2954
2955 void custom_peer_finalize(struct peerd *peer)
2956 {
2957         struct mapperd *mapper = __get_mapperd(peer);
2958         struct peer_req *pr = alloc_peer_req(peer);
2959         if (!pr){
2960                 XSEGLOG2(&lc, E, "Cannot get peer request");
2961                 return;
2962         }
2963         struct map *map;
2964         struct xseg_request *req;
2965         xhash_iter_t it;
2966         xhashidx key, val;
2967         xhash_iter_init(mapper->hashmaps, &it);
2968         while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
2969                 map = (struct map *)val;
2970                 if (!(map->flags & MF_MAP_EXCLUSIVE))
2971                         continue;
2972                 req = __close_map(pr, map);
2973                 if (!req)
2974                         continue;
2975                 wait_reply(peer, req);
2976                 if (!(req->state & XS_SERVED))
2977                         XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
2978                 map->flags &= ~MF_MAP_CLOSING;
2979                 xseg_put_request(peer->xseg, req, pr->portno);
2980         }
2981         return;
2982
2983
2984 }
2985
2986 void print_obj(struct map_node *mn)
2987 {
2988         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2989                         (unsigned long long) mn->objectidx, mn->object, 
2990                         (unsigned int) mn->objectlen, 
2991                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2992 }
2993
2994 void print_map(struct map *m)
2995 {
2996         uint64_t nr_objs = m->size/block_size;
2997         if (m->size % block_size)
2998                 nr_objs++;
2999         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n", 
3000                         m->volume, m->volumelen, 
3001                         (unsigned long long) m->size, 
3002                         (unsigned long long) nr_objs,
3003                         m->version);
3004         uint64_t i;
3005         struct map_node *mn;
3006         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
3007                 return;
3008         for (i = 0; i < nr_objs; i++) {
3009                 mn = find_object(m, i);
3010                 if (!mn){
3011                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
3012                         continue;
3013                 }
3014                 print_obj(mn);
3015         }
3016 }
3017
3018 /*
3019 void test_map(struct peerd *peer)
3020 {
3021         int i,j, ret;
3022         //struct sha256_ctx sha256ctx;
3023         unsigned char buf[SHA256_DIGEST_SIZE];
3024         char buf_new[XSEG_MAX_TARGETLEN + 20];
3025         struct map *m = malloc(sizeof(struct map));
3026         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
3027         m->volume[XSEG_MAX_TARGETLEN] = 0;
3028         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
3029         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
3030         m->volumelen = XSEG_MAX_TARGETLEN;
3031         m->size = 100*block_size;
3032         m->objects = xhash_new(3, INTEGER);
3033         struct map_node *map_node = calloc(100, sizeof(struct map_node));
3034         for (i = 0; i < 100; i++) {
3035                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
3036                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
3037                 
3038                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
3039                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
3040                 }
3041                 map_node[i].objectidx = i;
3042                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
3043                 map_node[i].flags = MF_OBJECT_EXIST;
3044                 ret = insert_object(m, &map_node[i]);
3045         }
3046
3047         char *data = malloc(block_size);
3048         mapheader_to_map(m, data);
3049         uint64_t pos = mapheader_size;
3050
3051         for (i = 0; i < 100; i++) {
3052                 map_node = find_object(m, i);
3053                 if (!map_node){
3054                         printf("no object node %d \n", i);
3055                         exit(1);
3056                 }
3057                 object_to_map(data+pos, map_node);
3058                 pos += objectsize_in_map;
3059         }
3060 //      print_map(m);
3061
3062         struct map *m2 = malloc(sizeof(struct map));
3063         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
3064         m->volume[XSEG_MAX_TARGETLEN] = 0;
3065         m->volumelen = XSEG_MAX_TARGETLEN;
3066
3067         m2->objects = xhash_new(3, INTEGER);
3068         ret = read_map(peer, m2, data);
3069 //      print_map(m2);
3070
3071         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
3072         ssize_t r, sum = 0;
3073         while (sum < block_size) {
3074                 r = write(fd, data + sum, block_size -sum);
3075                 if (r < 0){
3076                         perror("write");
3077                         printf("write error\n");
3078                         exit(1);
3079                 } 
3080                 sum += r;
3081         }
3082         close(fd);
3083         map_node = find_object(m, 0);
3084         free(map_node);
3085         free(m);
3086 }
3087 */