mapperd: Always sleep before checking request state.
[archipelago] / xseg / peers / user / mt-mapperd.c
1 /*
2  * Copyright 2012 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <stdio.h>
36 #include <unistd.h>
37 #include <sys/types.h>
38 #include <pthread.h>
39 #include <xseg/xseg.h>
40 #include <peer.h>
41 #include <time.h>
42 #include <xtypes/xlock.h>
43 #include <xtypes/xhash.h>
44 #include <xseg/protocol.h>
45 #include <sys/stat.h>
46 #include <fcntl.h>
47 #include <errno.h>
48 #include <sched.h>
49 #include <sys/syscall.h>
50 #include <openssl/sha.h>
51 #include <ctype.h>
52
53 /* general mapper flags */
54 #define MF_LOAD         (1 << 0)
55 #define MF_EXCLUSIVE    (1 << 1)
56 #define MF_FORCE        (1 << 2)
57 #define MF_ARCHIP       (1 << 3)
58
59 #ifndef SHA256_DIGEST_SIZE
60 #define SHA256_DIGEST_SIZE 32
61 #endif
62 /* hex representation of sha256 value takes up double the sha256 size */
63 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
64
65 #define block_size (1<<22) //FIXME this should be defined here?
66
67 /* transparency byte + max object len in disk */
68 #define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
69
70 /* Map header contains:
71  *      map version
72  *      volume size
73  */
74 #define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
75
76
77 #define MAPPER_PREFIX "archip_"
78 #define MAPPER_PREFIX_LEN 7
79
80 #define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
81 #define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
82
83 #if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
84 #error  "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
85 #endif
86
87 #define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
88
89 #if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
90 #error  "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
91 #endif
92
93 #define MAX_VOLUME_SIZE \
94 ((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
95
96
97 //char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
98
99 /* pithos considers this a block full of zeros, so should we.
100  * it is actually the sha256 hash of nothing.
101  */
102 char *zero_block="e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
103 #define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
104
105 /* dispatch_internal mapper states */
106 enum mapper_state {
107         ACCEPTED = 0,
108         WRITING = 1,
109         COPYING = 2,
110         DELETING = 3,
111         DROPPING_CACHE = 4
112 };
113
114 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
115
116
117 /* mapper object flags */
118 #define MF_OBJECT_EXIST         (1 << 0)
119 #define MF_OBJECT_COPYING       (1 << 1)
120 #define MF_OBJECT_WRITING       (1 << 2)
121 #define MF_OBJECT_DELETING      (1 << 3)
122 #define MF_OBJECT_DESTROYED     (1 << 5)
123 #define MF_OBJECT_SNAPSHOTTING  (1 << 6)
124
125 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
126                                 MF_OBJECT_DELETING|MF_OBJECT_SNAPSHOTTING)
127 struct map_node {
128         uint32_t flags;
129         uint32_t objectidx;
130         uint32_t objectlen;
131         char object[MAX_OBJECT_LEN + 1];        /* NULL terminated string */
132         struct map *map;
133         uint32_t ref;
134         uint32_t waiters;
135         st_cond_t cond;
136 };
137
138
139 #define wait_on_pr(__pr, __condition__)         \
140         do {                                    \
141                 ta--;                           \
142                 __get_mapper_io(pr)->active = 0;\
143                 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
144                 st_cond_wait(__pr->cond);       \
145         } while (__condition__)
146
147 #define wait_on_mapnode(__mn, __condition__)    \
148         do {                                    \
149                 ta--;                           \
150                 __mn->waiters++;                \
151                 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
152                         ta: %u",  __mn, __mn->object, __mn->waiters, ta);  \
153                 st_cond_wait(__mn->cond);       \
154         } while (__condition__)
155
156 #define wait_on_map(__map, __condition__)       \
157         do {                                    \
158                 ta--;                           \
159                 __map->waiters++;               \
160                 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
161                                    __map, __map->volume, __map->waiters, ta); \
162                 st_cond_wait(__map->cond);      \
163         } while (__condition__)
164
165 #define signal_pr(__pr)                         \
166         do {                                    \
167                 if (!__get_mapper_io(pr)->active){\
168                         ta++;                   \
169                         XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta);\
170                         __get_mapper_io(pr)->active = 1;\
171                         st_cond_signal(__pr->cond);     \
172                 }                               \
173         }while(0)
174
175 #define signal_map(__map)                       \
176         do {                                    \
177                 if (__map->waiters) {           \
178                         ta += 1;                \
179                         XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
180                         ta: %u",  __map, __map->volume, __map->waiters, ta); \
181                         __map->waiters--;       \
182                         st_cond_signal(__map->cond);    \
183                 }                               \
184         }while(0)
185
186 #define signal_mapnode(__mn)                    \
187         do {                                    \
188                 if (__mn->waiters) {            \
189                         ta += __mn->waiters;    \
190                         XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
191                         %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
192                         __mn->waiters = 0;      \
193                         st_cond_broadcast(__mn->cond);  \
194                 }                               \
195         }while(0)
196
197
198 /* map flags */
199 #define MF_MAP_LOADING          (1 << 0)
200 #define MF_MAP_DESTROYED        (1 << 1)
201 #define MF_MAP_WRITING          (1 << 2)
202 #define MF_MAP_DELETING         (1 << 3)
203 #define MF_MAP_DROPPING_CACHE   (1 << 4)
204 #define MF_MAP_EXCLUSIVE        (1 << 5)
205 #define MF_MAP_OPENING          (1 << 6)
206 #define MF_MAP_CLOSING          (1 << 7)
207 #define MF_MAP_DELETED          (1 << 8)
208 #define MF_MAP_SNAPSHOTTING     (1 << 9)
209
210 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
211                                 MF_MAP_DROPPING_CACHE|MF_MAP_OPENING|          \
212                                 MF_MAP_SNAPSHOTTING)
213
214 struct map {
215         uint32_t version;
216         uint32_t flags;
217         uint64_t size;
218         uint32_t volumelen;
219         char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
220         xhash_t *objects;       /* obj_index --> map_node */
221         uint32_t ref;
222         uint32_t waiters;
223         st_cond_t cond;
224 };
225
226 struct mapperd {
227         xport bportno;          /* blocker that accesses data */
228         xport mbportno;         /* blocker that accesses maps */
229         xhash_t *hashmaps; // hash_function(target) --> struct map
230 };
231
232 struct mapper_io {
233         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
234         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
235         struct map_node *copyup_node;
236         volatile int err;                       /* error flag */
237         volatile uint64_t del_pending;
238         volatile uint64_t snap_pending;
239         uint64_t delobj;
240         uint64_t dcobj;
241         cb_t cb;
242         enum mapper_state state;
243         volatile int active;
244 };
245
246 /* global vars */
247 struct mapperd *mapper;
248
249 void print_map(struct map *m);
250
251
252 void custom_peer_usage()
253 {
254         fprintf(stderr, "Custom peer options: \n"
255                         "-bp  : port for block blocker(!)\n"
256                         "-mbp : port for map blocker\n"
257                         "\n");
258 }
259
260
261 /*
262  * Helper functions
263  */
264
265 static inline struct mapperd * __get_mapperd(struct peerd *peer)
266 {
267         return (struct mapperd *) peer->priv;
268 }
269
270 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
271 {
272         return (struct mapper_io *) pr->priv;
273 }
274
275 static inline uint64_t calc_map_obj(struct map *map)
276 {
277         if (map->size == -1)
278                 return 0;
279         uint64_t nr_objs = map->size / block_size;
280         if (map->size % block_size)
281                 nr_objs++;
282         return nr_objs;
283 }
284
285 static uint32_t calc_nr_obj(struct xseg_request *req)
286 {
287         unsigned int r = 1;
288         uint64_t rem_size = req->size;
289         uint64_t obj_offset = req->offset & (block_size -1); //modulo
290         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
291         rem_size -= obj_size;
292         while (rem_size > 0) {
293                 obj_size = (rem_size > block_size) ? block_size : rem_size;
294                 rem_size -= obj_size;
295                 r++;
296         }
297
298         return r;
299 }
300
301 /* hexlify function.
302  * Unsafe. Doesn't check if data length is odd!
303  */
304
305 static void hexlify(unsigned char *data, char *hex)
306 {
307         int i;
308         for (i=0; i<SHA256_DIGEST_LENGTH; i++)
309                 sprintf(hex+2*i, "%02x", data[i]);
310 }
311
312 static void unhexlify(char *hex, unsigned char *data)
313 {
314         int i;
315         char c;
316         for (i=0; i<SHA256_DIGEST_LENGTH; i++){
317                 data[i] = 0;
318                 c = hex[2*i];
319                 if (isxdigit(c)){
320                         if (isdigit(c)){
321                                 c-= '0';
322                         }
323                         else {
324                                 c = tolower(c);
325                                 c = c-'a' + 10;
326                         }
327                 }
328                 else {
329                         c = 0;
330                 }
331                 data[i] |= (c << 4) & 0xF0;
332                 c = hex[2*i+1];
333                 if (isxdigit(c)){
334                         if (isdigit(c)){
335                                 c-= '0';
336                         }
337                         else {
338                                 c = tolower(c);
339                                 c = c-'a' + 10;
340                         }
341                 }
342                 else {
343                         c = 0;
344                 }
345                 data[i] |= c & 0x0F;
346         }
347 }
348
349 void merkle_hash(unsigned char *hashes, unsigned long len,
350                 unsigned char hash[SHA256_DIGEST_SIZE])
351 {
352         uint32_t i, l, s = 2;
353         uint32_t nr = len/SHA256_DIGEST_SIZE;
354         unsigned char *buf;
355         unsigned char tmp_hash[SHA256_DIGEST_SIZE];
356
357         if (!nr){
358                 SHA256(hashes, 0, hash);
359                 return;
360         }
361         if (nr == 1){
362                 memcpy(hash, hashes, SHA256_DIGEST_SIZE);
363                 return;
364         }
365         while (s < nr)
366                 s = s << 1;
367         buf = malloc(sizeof(unsigned char)* SHA256_DIGEST_SIZE * s);
368         memcpy(buf, hashes, nr * SHA256_DIGEST_SIZE);
369         memset(buf + nr * SHA256_DIGEST_SIZE, 0, (s - nr) * SHA256_DIGEST_SIZE);
370         for (l = s; l > 1; l = l/2) {
371                 for (i = 0; i < l; i += 2) {
372                         SHA256(buf + (i * SHA256_DIGEST_SIZE),
373                                         2 * SHA256_DIGEST_SIZE, tmp_hash);
374                         memcpy(buf + (i/2 * SHA256_DIGEST_SIZE),
375                                         tmp_hash, SHA256_DIGEST_SIZE);
376                 }
377         }
378         memcpy(hash, buf, SHA256_DIGEST_SIZE);
379 }
380
381 /*
382  * Maps handling functions
383  */
384
385 static struct map * find_map(struct mapperd *mapper, char *volume)
386 {
387         struct map *m = NULL;
388         int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
389                                 (xhashidx *) &m);
390         if (r < 0)
391                 return NULL;
392         return m;
393 }
394
395 static struct map * find_map_len(struct mapperd *mapper, char *target,
396                                         uint32_t targetlen, uint32_t flags)
397 {
398         char buf[XSEG_MAX_TARGETLEN+1];
399         if (flags & MF_ARCHIP){
400                 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
401                 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
402                 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
403                 targetlen += MAPPER_PREFIX_LEN;
404         }
405         else {
406                 strncpy(buf, target, targetlen);
407                 buf[targetlen] = 0;
408         }
409
410         if (targetlen > MAX_VOLUME_LEN){
411                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
412                                         targetlen, MAX_VOLUME_LEN);
413                 return NULL;
414         }
415
416         XSEGLOG2(&lc, D, "looking up map %s, len %u",
417                         buf, targetlen);
418         return find_map(mapper, buf);
419 }
420
421
422 static int insert_map(struct mapperd *mapper, struct map *map)
423 {
424         int r = -1;
425
426         if (find_map(mapper, map->volume)){
427                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
428                 goto out;
429         }
430
431         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
432                         map->volume, strlen(map->volume), (unsigned long) map);
433         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
434         while (r == -XHASH_ERESIZE) {
435                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
436                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
437                 if (!new_hashmap){
438                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
439                                         (unsigned long long) shift);
440                         goto out;
441                 }
442                 mapper->hashmaps = new_hashmap;
443                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
444         }
445 out:
446         return r;
447 }
448
449 static int remove_map(struct mapperd *mapper, struct map *map)
450 {
451         int r = -1;
452
453         //assert no pending pr on map
454
455         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
456         while (r == -XHASH_ERESIZE) {
457                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
458                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
459                 if (!new_hashmap){
460                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
461                                         (unsigned long long) shift);
462                         goto out;
463                 }
464                 mapper->hashmaps = new_hashmap;
465                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
466         }
467 out:
468         return r;
469 }
470
471 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
472 {
473         int r;
474         xport p;
475         struct peerd *peer = pr->peer;
476         struct xseg_request *req;
477         struct mapperd *mapper = __get_mapperd(peer);
478         void *dummy;
479
480         XSEGLOG2(&lc, I, "Closing map %s", map->volume);
481
482         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
483         if (!req){
484                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
485                                 map->volume);
486                 goto out_err;
487         }
488
489         r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
490         if (r < 0){
491                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
492                                 map->volume);
493                 goto out_put;
494         }
495
496         char *reqtarget = xseg_get_target(peer->xseg, req);
497         if (!reqtarget)
498                 goto out_put;
499         strncpy(reqtarget, map->volume, req->targetlen);
500         req->op = X_RELEASE;
501         req->size = 0;
502         req->offset = 0;
503         r = xseg_set_req_data(peer->xseg, req, pr);
504         if (r < 0){
505                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
506                                 map->volume);
507                 goto out_put;
508         }
509         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
510         if (p == NoPort){
511                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
512                                 map->volume);
513                 goto out_unset;
514         }
515         r = xseg_signal(peer->xseg, p);
516         map->flags |= MF_MAP_CLOSING;
517
518         XSEGLOG2(&lc, I, "Map %s closing", map->volume);
519         return req;
520
521 out_unset:
522         xseg_get_req_data(peer->xseg, req, &dummy);
523 out_put:
524         xseg_put_request(peer->xseg, req, pr->portno);
525 out_err:
526         return NULL;
527 }
528
529 static int close_map(struct peer_req *pr, struct map *map)
530 {
531         int err;
532         struct xseg_request *req;
533         struct peerd *peer = pr->peer;
534
535         req = __close_map(pr, map);
536         if (!req)
537                 return -1;
538         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
539         map->flags &= ~MF_MAP_CLOSING;
540         err = req->state & XS_FAILED;
541         xseg_put_request(peer->xseg, req, pr->portno);
542         if (err)
543                 return -1;
544         return 0;
545 }
546
547 /*
548 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
549                                 char *target, uint32_t targetlen, struct map **m)
550 {
551         struct mapperd *mapper = __get_mapperd(peer);
552         int r;
553         *m = find_map(mapper, target, targetlen);
554         if (*m) {
555                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
556                 if ((*m)->flags & MF_MAP_NOT_READY) {
557                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
558                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
559                         return MF_PENDING;
560                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
561                 //      return -1;
562                 // 
563                 }else {
564                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
565                         return 0;
566                 }
567         }
568         r = open_map(peer, pr, target, targetlen, 0);
569         if (r < 0)
570                 return -1; //error
571         return MF_PENDING;      
572 }
573 */
574 /*
575  * Object handling functions
576  */
577
578 struct map_node *find_object(struct map *map, uint64_t obj_index)
579 {
580         struct map_node *mn;
581         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
582         if (r < 0)
583                 return NULL;
584         return mn;
585 }
586
587 static int insert_object(struct map *map, struct map_node *mn)
588 {
589         //FIXME no find object first
590         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
591         if (r == -XHASH_ERESIZE) {
592                 unsigned long shift = xhash_grow_size_shift(map->objects);
593                 map->objects = xhash_resize(map->objects, shift, NULL);
594                 if (!map->objects)
595                         return -1;
596                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
597         }
598         return r;
599 }
600
601
602 /*
603  * map read/write functions
604  *
605  * version 0 -> pithos map
606  * version 1 -> archipelago version 1
607  *
608  *
609  * functions
610  *      int read_object(struct map_node *mn, unsigned char *buf)
611  *      int prepare_write_object(struct peer_req *pr, struct map *map,
612  *                              struct map_node *mn, struct xseg_request *req)
613  *      int read_map(struct map *m, unsigned char * data)
614  *      int prepare_write_map(struct peer_req *pr, struct map *map,
615  *                                      struct xseg_request *req)
616  */
617
618 struct map_functions {
619         int (*read_object)(struct map_node *mn, unsigned char *buf);
620         int (*prepare_write_object)(struct peer_req *pr, struct map *map,
621                                 struct map_node *mn, struct xseg_request *req);
622         int (*read_map)(struct map *m, unsigned char * data);
623         int (*prepare_write_map)(struct peer_req *pr, struct map *map,
624                                         struct xseg_request *req);
625 };
626
627 /* version 0 functions */
628
629 /* no header */
630 #define v0_mapheader_size 0
631 /* just the unhexlified name */
632 #define v0_objectsize_in_map SHA256_DIGEST_SIZE
633
634 static inline int read_object_v0(struct map_node *mn, unsigned char *buf)
635 {
636         hexlify(buf, mn->object);
637         mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
638         mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
639         mn->flags = MF_OBJECT_EXIST;
640
641         return 0;
642 }
643
644 static void v0_object_to_map(struct map_node *mn, unsigned char *data)
645 {
646         unhexlify(mn->object, data);
647 }
648
649 static int prepare_write_object_v0(struct peer_req *pr, struct map *map,
650                         struct map_node *mn, struct xseg_request *req)
651 {
652         struct peerd *peer = pr->peer;
653         int r = xseg_prep_request(peer->xseg, req, map->volumelen, v0_objectsize_in_map);
654         if (r < 0){
655                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
656                                 "(Map: %s [%llu]",
657                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
658                 return -1;
659         }
660         char *target = xseg_get_target(peer->xseg, req);
661         strncpy(target, map->volume, req->targetlen);
662         req->size = req->datalen;
663         req->offset = v0_mapheader_size + mn->objectidx * v0_objectsize_in_map;
664
665         unsigned char *data = xseg_get_data(pr->peer->xseg, req);
666         v0_object_to_map(mn, data);
667         return -1;
668 }
669
670 static int read_map_v0(struct map *m, unsigned char * data)
671 {
672         int r;
673         struct map_node *map_node;
674         uint64_t i;
675         uint64_t pos = 0;
676         uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
677         XSEGLOG2(&lc, D, "Max nr_objs %llu", max_nr_objs);
678         char nulls[SHA256_DIGEST_SIZE];
679         memset(nulls, 0, SHA256_DIGEST_SIZE);
680         map_node = calloc(max_nr_objs, sizeof(struct map_node));
681         if (!map_node)
682                 return -1;
683         for (i = 0; i < max_nr_objs; i++) {
684                 if (!memcmp(data+pos, nulls, v0_objectsize_in_map))
685                         break;
686                 map_node[i].objectidx = i;
687                 map_node[i].map = m;
688                 map_node[i].waiters = 0;
689                 map_node[i].ref = 1;
690                 map_node[i].cond = st_cond_new(); //FIXME err check;
691                 read_object_v0(&map_node[i], data+pos);
692                 pos += v0_objectsize_in_map;
693                 r = insert_object(m, &map_node[i]); //FIXME error check
694         }
695         XSEGLOG2(&lc, D, "Found %llu objects", i);
696         m->size = i * block_size;
697         return 0;
698 }
699
700 static int prepare_write_map_v0(struct peer_req *pr, struct map *map,
701                                 struct xseg_request *req)
702 {
703         struct peerd *peer = pr->peer;
704         uint64_t i, pos = 0, max_objidx = calc_map_obj(map);
705         struct map_node *mn;
706         int r = xseg_prep_request(peer->xseg, req, map->volumelen,
707                         v0_mapheader_size + max_objidx * v0_objectsize_in_map);
708         if (r < 0){
709                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
710                 return -1;
711         }
712         char *target = xseg_get_target(peer->xseg, req);
713         strncpy(target, map->volume, req->targetlen);
714         char *data = xseg_get_data(peer->xseg, req);
715
716         req->op = X_WRITE;
717         req->size = req->datalen;
718         req->offset = 0;
719
720         for (i = 0; i < max_objidx; i++) {
721                 mn = find_object(map, i);
722                 if (!mn){
723                         XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
724                                         (unsigned long long) i, map->volume);
725                         return -1;
726                 }
727                 v0_object_to_map(mn, (unsigned char *)(data+pos));
728                 pos += v0_objectsize_in_map;
729         }
730         XSEGLOG2(&lc, D, "Prepared %llu objects", i);
731         return 0;
732 }
733
734 /* static struct map_functions map_functions_v0 =       { */
735 /*                      .read_object = read_object_v0, */
736 /*                      .read_map = read_map_v0, */
737 /*                      .prepare_write_object = prepare_write_object_v0, */
738 /*                      .prepare_write_map = prepare_write_map_v0 */
739 /* }; */
740 #define map_functions_v0 {                              \
741                         .read_object = read_object_v0,  \
742                         .read_map = read_map_v0,        \
743                         .prepare_write_object = prepare_write_object_v0,\
744                         .prepare_write_map = prepare_write_map_v0       \
745                         }
746 /* v1 functions */
747
748 /* transparency byte + max object len in disk */
749 #define v1_objectsize_in_map (1 + SHA256_DIGEST_SIZE)
750
751 /* Map header contains:
752  *      map version
753  *      volume size
754  */
755 #define v1_mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
756
757 static inline int read_object_v1(struct map_node *mn, unsigned char *buf)
758 {
759         char c = buf[0];
760         mn->flags = 0;
761         if (c){
762                 mn->flags |= MF_OBJECT_EXIST;
763                 strcpy(mn->object, MAPPER_PREFIX);
764                 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
765                 mn->object[MAX_OBJECT_LEN] = 0;
766                 mn->objectlen = strlen(mn->object);
767         }
768         else {
769                 mn->flags &= ~MF_OBJECT_EXIST;
770                 hexlify(buf+1, mn->object);
771                 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
772                 mn->objectlen = strlen(mn->object);
773         }
774         return 0;
775 }
776
777 static inline void v1_object_to_map(char* buf, struct map_node *mn)
778 {
779         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
780         if (buf[0]){
781                 /* strip common prefix */
782                 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
783         }
784         else {
785                 unhexlify(mn->object, (unsigned char *)(buf+1));
786         }
787 }
788
789 static int prepare_write_object_v1(struct peer_req *pr, struct map *map,
790                                 struct map_node *mn, struct xseg_request *req)
791 {
792         struct peerd *peer = pr->peer;
793         int r = xseg_prep_request(peer->xseg, req, map->volumelen, v1_objectsize_in_map);
794         if (r < 0){
795                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
796                                 "(Map: %s [%llu]",
797                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
798                 return -1;
799         }
800         char *target = xseg_get_target(peer->xseg, req);
801         strncpy(target, map->volume, req->targetlen);
802         req->size = req->datalen;
803         req->offset = v1_mapheader_size + mn->objectidx * v1_objectsize_in_map;
804
805         char *data = xseg_get_data(pr->peer->xseg, req);
806         v1_object_to_map(data, mn);
807         return 0;
808 }
809
810 static int read_map_v1(struct map *m, unsigned char * data)
811 {
812         int r;
813         struct map_node *map_node;
814         uint64_t i;
815         uint64_t pos = 0;
816         uint64_t nr_objs;
817
818         /* read header */
819         m->version = *(uint32_t *) (data + pos);
820         pos += sizeof(uint32_t);
821         m->size = *(uint64_t *) (data + pos);
822         pos += sizeof(uint64_t);
823
824         /* read objects */
825         nr_objs = m->size / block_size;
826         if (m->size % block_size)
827                 nr_objs++;
828         map_node = calloc(nr_objs, sizeof(struct map_node));
829         if (!map_node)
830                 return -1;
831
832         for (i = 0; i < nr_objs; i++) {
833                 map_node[i].map = m;
834                 map_node[i].objectidx = i;
835                 map_node[i].waiters = 0;
836                 map_node[i].ref = 1;
837                 map_node[i].cond = st_cond_new(); //FIXME err check;
838                 read_object_v1(&map_node[i], data+pos);
839                 pos += objectsize_in_map;
840                 r = insert_object(m, &map_node[i]); //FIXME error check
841         }
842         return 0;
843 }
844
845 static int prepare_write_map_v1(struct peer_req *pr, struct map *m,
846                                 struct xseg_request *req)
847 {
848         struct peerd *peer = pr->peer;
849         uint64_t i, pos = 0, max_objidx = calc_map_obj(m);
850         struct map_node *mn;
851
852         int r = xseg_prep_request(peer->xseg, req, m->volumelen,
853                         v1_mapheader_size + max_objidx * v1_objectsize_in_map);
854         if (r < 0){
855                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", m->volume);
856                 return -1;
857         }
858         char *target = xseg_get_target(peer->xseg, req);
859         strncpy(target, m->volume, req->targetlen);
860         char *data = xseg_get_data(peer->xseg, req);
861
862         memcpy(data + pos, &m->version, sizeof(m->version));
863         pos += sizeof(m->version);
864         memcpy(data + pos, &m->size, sizeof(m->size));
865         pos += sizeof(m->size);
866
867         req->op = X_WRITE;
868         req->size = req->datalen;
869         req->offset = 0;
870
871         for (i = 0; i < max_objidx; i++) {
872                 mn = find_object(m, i);
873                 if (!mn){
874                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
875                                         (unsigned long long) i, m->volume);
876                         return -1;
877                 }
878                 v1_object_to_map(data+pos, mn);
879                 pos += v1_objectsize_in_map;
880         }
881         return 0;
882 }
883
884 /* static struct map_functions map_functions_v1 =       { */
885 /*                      .read_object = read_object_v1, */
886 /*                      .read_map = read_map_v1, */
887 /*                      .prepare_write_object = prepare_write_object_v1, */
888 /*                      .prepare_write_map = prepare_write_map_v1 */
889 /* }; */
890 #define map_functions_v1 {                              \
891                         .read_object = read_object_v1,  \
892                         .read_map = read_map_v1,        \
893                         .prepare_write_object = prepare_write_object_v1,\
894                         .prepare_write_map = prepare_write_map_v1       \
895                         }
896
897 static struct map_functions map_functions[] = { map_functions_v0,
898                                                 map_functions_v1 };
899 #define MAP_LATEST_VERSION 1
900 /* end of functions */
901
902
903
904
905
906 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
907 {
908         hexlify(buf, mn->object);
909         mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
910         mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
911         mn->flags = MF_OBJECT_EXIST;
912 }
913
914 static inline void map_to_object(struct map_node *mn, unsigned char *buf)
915 {
916         char c = buf[0];
917         mn->flags = 0;
918         if (c){
919                 mn->flags |= MF_OBJECT_EXIST;
920                 strcpy(mn->object, MAPPER_PREFIX);
921                 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
922                 mn->object[MAX_OBJECT_LEN] = 0;
923                 mn->objectlen = strlen(mn->object);
924         }
925         else {
926                 hexlify(buf+1, mn->object);
927                 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
928                 mn->objectlen = strlen(mn->object);
929         }
930
931 }
932
933 static inline void object_to_map(char* buf, struct map_node *mn)
934 {
935         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
936         if (buf[0]){
937                 /* strip common prefix */
938                 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
939         }
940         else {
941                 unhexlify(mn->object, (unsigned char *)(buf+1));
942         }
943 }
944
945 static inline void mapheader_to_map(struct map *m, char *buf)
946 {
947         uint64_t pos = 0;
948         memcpy(buf + pos, &m->version, sizeof(m->version));
949         pos += sizeof(m->version);
950         memcpy(buf + pos, &m->size, sizeof(m->size));
951         pos += sizeof(m->size);
952 }
953
954
955 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
956                                 struct map *map, struct map_node *mn)
957 {
958         int r;
959         void *dummy;
960         struct mapperd *mapper = __get_mapperd(peer);
961         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
962                                                         mapper->mbportno, X_ALLOC);
963         if (!req){
964                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
965                                 "(Map: %s [%llu]",
966                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
967                 goto out_err;
968         }
969
970         r = map_functions[map->version].prepare_write_object(pr, map, mn, req);
971         if (r < 0){
972                 XSEGLOG2(&lc, E, "Cannot prepare write object");
973                 goto out_put;
974         }
975         req->op = X_WRITE;
976
977         r = xseg_set_req_data(peer->xseg, req, pr);
978         if (r < 0){
979                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
980                                 "(Map: %s [%llu]",
981                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
982                 goto out_put;
983         }
984         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
985         if (p == NoPort){
986                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
987                                 "(Map: %s [%llu]",
988                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
989                 goto out_unset;
990         }
991         r = xseg_signal(peer->xseg, p);
992         if (r < 0)
993                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
994
995         XSEGLOG2(&lc, I, "Writing object %s \n\t"
996                         "Map: %s [%llu]",
997                         mn->object, map->volume, (unsigned long long) mn->objectidx);
998
999         return req;
1000
1001 out_unset:
1002         xseg_get_req_data(peer->xseg, req, &dummy);
1003 out_put:
1004         xseg_put_request(peer->xseg, req, pr->portno);
1005 out_err:
1006         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
1007                         "(Map: %s [%llu]",
1008                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1009         return NULL;
1010 }
1011
1012 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
1013 {
1014         int r;
1015         void *dummy;
1016         struct peerd *peer = pr->peer;
1017         struct mapperd *mapper = __get_mapperd(peer);
1018         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1019                                                         mapper->mbportno, X_ALLOC);
1020         if (!req){
1021                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
1022                 goto out_err;
1023         }
1024
1025         r = map_functions[map->version].prepare_write_map(pr, map, req);
1026         if (r < 0){
1027                 XSEGLOG2(&lc, E, "Cannot prepare write map");
1028                 goto out_put;
1029         }
1030
1031         req->op = X_WRITE;
1032
1033         r = xseg_set_req_data(peer->xseg, req, pr);
1034         if (r < 0){
1035                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1036                                 map->volume);
1037                 goto out_put;
1038         }
1039         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1040         if (p == NoPort){
1041                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1042                                 map->volume);
1043                 goto out_unset;
1044         }
1045         r = xseg_signal(peer->xseg, p);
1046         if (r < 0)
1047                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
1048
1049         map->flags |= MF_MAP_WRITING;
1050         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
1051         return req;
1052
1053 out_unset:
1054         xseg_get_req_data(peer->xseg, req, &dummy);
1055 out_put:
1056         xseg_put_request(peer->xseg, req, pr->portno);
1057 out_err:
1058         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
1059         return NULL;
1060 }
1061
1062 static int write_map(struct peer_req* pr, struct map *map)
1063 {
1064         int r = 0;
1065         struct peerd *peer = pr->peer;
1066         struct xseg_request *req = __write_map(pr, map);
1067         if (!req)
1068                 return -1;
1069         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1070         if (req->state & XS_FAILED)
1071                 r = -1;
1072         xseg_put_request(peer->xseg, req, pr->portno);
1073         map->flags &= ~MF_MAP_WRITING;
1074         return r;
1075 }
1076
1077 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
1078 {
1079         int r;
1080         xport p;
1081         struct xseg_request *req;
1082         struct peerd *peer = pr->peer;
1083         struct mapperd *mapper = __get_mapperd(peer);
1084         void *dummy;
1085
1086         XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
1087
1088         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1089         if (!req){
1090                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1091                                 m->volume);
1092                 goto out_fail;
1093         }
1094
1095         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1096         if (r < 0){
1097                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1098                                 m->volume);
1099                 goto out_put;
1100         }
1101
1102         char *reqtarget = xseg_get_target(peer->xseg, req);
1103         if (!reqtarget)
1104                 goto out_put;
1105         strncpy(reqtarget, m->volume, req->targetlen);
1106         req->op = X_READ;
1107         req->size = block_size;
1108         req->offset = 0;
1109         r = xseg_set_req_data(peer->xseg, req, pr);
1110         if (r < 0){
1111                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1112                                 m->volume);
1113                 goto out_put;
1114         }
1115         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1116         if (p == NoPort){
1117                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1118                                 m->volume);
1119                 goto out_unset;
1120         }
1121         r = xseg_signal(peer->xseg, p);
1122
1123         m->flags |= MF_MAP_LOADING;
1124         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
1125         return req;
1126
1127 out_unset:
1128         xseg_get_req_data(peer->xseg, req, &dummy);
1129 out_put:
1130         xseg_put_request(peer->xseg, req, pr->portno);
1131 out_fail:
1132         return NULL;
1133 }
1134
1135 static int read_map (struct map *map, unsigned char *buf)
1136 {
1137         char nulls[SHA256_DIGEST_SIZE];
1138         memset(nulls, 0, SHA256_DIGEST_SIZE);
1139
1140         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
1141         if (r) {
1142                 XSEGLOG2(&lc, E, "Read zeros");
1143                 return -1;
1144         }
1145         //type 1, archip type, type 0 pithos map
1146         int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1147         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
1148         uint32_t version;
1149         if (type)
1150                 version = *(uint32_t *) (buf); //version should always be the first uint32_t
1151         else
1152                 version = 0;
1153         if (version > MAP_LATEST_VERSION){
1154                 XSEGLOG2(&lc, E, "Map read for map %s failed. Invalid version %u",
1155                                 map->volume, version);
1156                 return -1;
1157         }
1158
1159         r = map_functions[version].read_map(map, buf);
1160         if (r < 0){
1161                 XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
1162                 return -1;
1163         }
1164
1165         print_map(map);
1166         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
1167         return 0;
1168
1169 }
1170
1171 static int load_map(struct peer_req *pr, struct map *map)
1172 {
1173         int r = 0;
1174         struct xseg_request *req;
1175         struct peerd *peer = pr->peer;
1176         req = __load_map(pr, map);
1177         if (!req)
1178                 return -1;
1179         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1180         map->flags &= ~MF_MAP_LOADING;
1181         if (req->state & XS_FAILED){
1182                 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
1183                 xseg_put_request(peer->xseg, req, pr->portno);
1184                 return -1;
1185         }
1186         r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
1187         xseg_put_request(peer->xseg, req, pr->portno);
1188         return r;
1189 }
1190
1191 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
1192                                                 uint32_t flags)
1193 {
1194         int r;
1195         xport p;
1196         struct xseg_request *req;
1197         struct peerd *peer = pr->peer;
1198         struct mapperd *mapper = __get_mapperd(peer);
1199         void *dummy;
1200
1201         XSEGLOG2(&lc, I, "Opening map %s", m->volume);
1202
1203         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1204         if (!req){
1205                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1206                                 m->volume);
1207                 goto out_fail;
1208         }
1209
1210         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1211         if (r < 0){
1212                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1213                                 m->volume);
1214                 goto out_put;
1215         }
1216
1217         char *reqtarget = xseg_get_target(peer->xseg, req);
1218         if (!reqtarget)
1219                 goto out_put;
1220         strncpy(reqtarget, m->volume, req->targetlen);
1221         req->op = X_ACQUIRE;
1222         req->size = block_size;
1223         req->offset = 0;
1224         if (!(flags & MF_FORCE))
1225                 req->flags = XF_NOSYNC;
1226         r = xseg_set_req_data(peer->xseg, req, pr);
1227         if (r < 0){
1228                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1229                                 m->volume);
1230                 goto out_put;
1231         }
1232         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1233         if (p == NoPort){ 
1234                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1235                                 m->volume);
1236                 goto out_unset;
1237         }
1238         r = xseg_signal(peer->xseg, p);
1239
1240         m->flags |= MF_MAP_OPENING;
1241         XSEGLOG2(&lc, I, "Map %s opening", m->volume);
1242         return req;
1243
1244 out_unset:
1245         xseg_get_req_data(peer->xseg, req, &dummy);
1246 out_put:
1247         xseg_put_request(peer->xseg, req, pr->portno);
1248 out_fail:
1249         return NULL;
1250 }
1251
1252 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
1253 {
1254         int err;
1255         struct xseg_request *req;
1256         struct peerd *peer = pr->peer;
1257
1258         req = __open_map(pr, map, flags);
1259         if (!req){
1260                 return -1;
1261         }
1262         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1263         map->flags &= ~MF_MAP_OPENING;
1264         err = req->state & XS_FAILED;
1265         xseg_put_request(peer->xseg, req, pr->portno);
1266         if (err)
1267                 return -1;
1268         else
1269                 map->flags |= MF_MAP_EXCLUSIVE;
1270         return 0;
1271 }
1272
1273 /*
1274  * copy up functions
1275  */
1276
1277 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
1278 {
1279         int r = 0;
1280         if (mn){
1281                 XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
1282                                 req, mn, mio);
1283                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1284                 if (r == -XHASH_ERESIZE) {
1285                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
1286                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1287                         if (!new_hashmap)
1288                                 goto out;
1289                         mio->copyups_nodes = new_hashmap;
1290                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1291                 }
1292                 if (r < 0)
1293                         XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
1294                                         req, mn, mio);
1295         }
1296         else {
1297                 XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
1298                                 req, mio);
1299                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1300                 if (r == -XHASH_ERESIZE) {
1301                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
1302                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1303                         if (!new_hashmap)
1304                                 goto out;
1305                         mio->copyups_nodes = new_hashmap;
1306                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1307                 }
1308                 if (r < 0)
1309                         XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
1310                                         req, mio);
1311         }
1312 out:
1313         return r;
1314 }
1315
1316 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
1317 {
1318         struct map_node *mn;
1319         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
1320         if (r < 0){
1321                 XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
1322                 return NULL;
1323         }
1324         XSEGLOG2(&lc, D, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
1325         return mn;
1326 }
1327
1328 static struct xseg_request * __snapshot_object(struct peer_req *pr,
1329                                                 struct map_node *mn)
1330 {
1331         struct peerd *peer = pr->peer;
1332         struct mapperd *mapper = __get_mapperd(peer);
1333         struct mapper_io *mio = __get_mapper_io(pr);
1334         //struct map *map = mn->map;
1335         void *dummy;
1336         int r = -1;
1337         xport p;
1338
1339         //assert mn->volume != zero_block
1340         //assert mn->flags & MF_OBJECT_EXIST
1341         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1342                                                 mapper->bportno, X_ALLOC);
1343         if (!req){
1344                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1345                 goto out_err;
1346         }
1347         r = xseg_prep_request(peer->xseg, req, mn->objectlen,
1348                                 sizeof(struct xseg_request_snapshot));
1349         if (r < 0){
1350                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1351                 goto out_put;
1352         }
1353
1354         char *target = xseg_get_target(peer->xseg, req);
1355         strncpy(target, mn->object, req->targetlen);
1356
1357         struct xseg_request_snapshot *xsnapshot = (struct xseg_request_snapshot *) xseg_get_data(peer->xseg, req);
1358         xsnapshot->target[0] = 0;
1359         xsnapshot->targetlen = 0;
1360
1361         req->offset = 0;
1362         req->size = block_size;
1363         req->op = X_SNAPSHOT;
1364         r = xseg_set_req_data(peer->xseg, req, pr);
1365         if (r<0){
1366                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1367                 goto out_put;
1368         }
1369         r = __set_copyup_node(mio, req, mn);
1370         if (r < 0)
1371                 goto out_unset;
1372         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1373         if (p == NoPort) {
1374                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1375                 goto out_mapper_unset;
1376         }
1377         xseg_signal(peer->xseg, p);
1378
1379         mn->flags |= MF_OBJECT_SNAPSHOTTING;
1380         XSEGLOG2(&lc, I, "Snapshotting up object %s", mn->object);
1381         return req;
1382
1383 out_mapper_unset:
1384         __set_copyup_node(mio, req, NULL);
1385 out_unset:
1386         xseg_get_req_data(peer->xseg, req, &dummy);
1387 out_put:
1388         xseg_put_request(peer->xseg, req, pr->portno);
1389 out_err:
1390         XSEGLOG2(&lc, E, "Snapshotting object %s failed", mn->object);
1391         return NULL;
1392 }
1393
1394 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
1395 {
1396         struct mapperd *mapper = __get_mapperd(peer);
1397         struct mapper_io *mio = __get_mapper_io(pr);
1398         struct map *map = mn->map;
1399         void *dummy;
1400         int r = -1;
1401         xport p;
1402
1403         uint32_t newtargetlen;
1404         char new_target[MAX_OBJECT_LEN + 1];
1405         unsigned char sha[SHA256_DIGEST_SIZE];
1406
1407         strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1408
1409         char tmp[XSEG_MAX_TARGETLEN + 1];
1410         uint32_t tmplen;
1411         strncpy(tmp, map->volume, map->volumelen);
1412         sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1413         tmp[XSEG_MAX_TARGETLEN] = 0;
1414         tmplen = strlen(tmp);
1415         SHA256((unsigned char *)tmp, tmplen, sha);
1416         hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1417         newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1418
1419
1420         if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1421                 goto copyup_zeroblock;
1422
1423         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1424                                                 mapper->bportno, X_ALLOC);
1425         if (!req){
1426                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1427                 goto out_err;
1428         }
1429         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
1430                                 sizeof(struct xseg_request_copy));
1431         if (r < 0){
1432                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1433                 goto out_put;
1434         }
1435
1436         char *target = xseg_get_target(peer->xseg, req);
1437         strncpy(target, new_target, req->targetlen);
1438
1439         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1440         strncpy(xcopy->target, mn->object, mn->objectlen);
1441         xcopy->targetlen = mn->objectlen;
1442
1443         req->offset = 0;
1444         req->size = block_size;
1445         req->op = X_COPY;
1446         r = xseg_set_req_data(peer->xseg, req, pr);
1447         if (r<0){
1448                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1449                 goto out_put;
1450         }
1451         r = __set_copyup_node(mio, req, mn);
1452         if (r < 0)
1453                 goto out_unset;
1454         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1455         if (p == NoPort) {
1456                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1457                 goto out_mapper_unset;
1458         }
1459         xseg_signal(peer->xseg, p);
1460 //      mio->copyups++;
1461
1462         mn->flags |= MF_OBJECT_COPYING;
1463         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1464         return req;
1465
1466 out_mapper_unset:
1467         __set_copyup_node(mio, req, NULL);
1468 out_unset:
1469         xseg_get_req_data(peer->xseg, req, &dummy);
1470 out_put:
1471         xseg_put_request(peer->xseg, req, pr->portno);
1472 out_err:
1473         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1474         return NULL;
1475
1476 copyup_zeroblock:
1477         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1478                         "Proceeding in writing the new object in map");
1479         /* construct a tmp map_node for writing purposes */
1480         struct map_node newmn = *mn;
1481         newmn.flags = MF_OBJECT_EXIST;
1482         strncpy(newmn.object, new_target, newtargetlen);
1483         newmn.object[newtargetlen] = 0;
1484         newmn.objectlen = newtargetlen;
1485         newmn.objectidx = mn->objectidx; 
1486         req = object_write(peer, pr, map, &newmn);
1487         r = __set_copyup_node(mio, req, mn);
1488         if (r < 0)
1489                 return NULL;
1490         if (!req){
1491                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1492                                 "\n\t of map %s [%llu]",
1493                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
1494                 return NULL;
1495         }
1496         mn->flags |= MF_OBJECT_WRITING;
1497         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1498         return req;
1499 }
1500
1501 static struct xseg_request * __delete_object(struct peer_req *pr, struct map_node *mn)
1502 {
1503         void *dummy;
1504         struct peerd *peer = pr->peer;
1505         struct mapperd *mapper = __get_mapperd(peer);
1506         struct mapper_io *mio = __get_mapper_io(pr);
1507         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1508                                                         mapper->bportno, X_ALLOC);
1509         XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1510         if (!req){
1511                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1512                 goto out_err;
1513         }
1514         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1515         if (r < 0){
1516                 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1517                 goto out_put;
1518         }
1519         char *target = xseg_get_target(peer->xseg, req);
1520         strncpy(target, mn->object, req->targetlen);
1521         req->op = X_DELETE;
1522         req->size = req->datalen;
1523         req->offset = 0;
1524         r = xseg_set_req_data(peer->xseg, req, pr);
1525         if (r < 0){
1526                 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1527                 goto out_put;
1528         }
1529         r = __set_copyup_node(mio, req, mn);
1530         if (r < 0)
1531                 goto out_unset;
1532         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1533         if (p == NoPort){
1534                 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1535                 goto out_mapper_unset;
1536         }
1537         r = xseg_signal(peer->xseg, p);
1538         mn->flags |= MF_OBJECT_DELETING;
1539         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1540         return req;
1541
1542 out_mapper_unset:
1543         __set_copyup_node(mio, req, NULL);
1544 out_unset:
1545         xseg_get_req_data(peer->xseg, req, &dummy);
1546 out_put:
1547         xseg_put_request(peer->xseg, req, pr->portno);
1548 out_err:
1549         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1550         return NULL;
1551 }
1552
1553 static struct xseg_request * __delete_map(struct peer_req *pr, struct map *map)
1554 {
1555         void *dummy;
1556         struct peerd *peer = pr->peer;
1557         struct mapperd *mapper = __get_mapperd(peer);
1558         struct mapper_io *mio = __get_mapper_io(pr);
1559         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1560                                                         mapper->mbportno, X_ALLOC);
1561         XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1562         if (!req){
1563                 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1564                 goto out_err;
1565         }
1566         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1567         if (r < 0){
1568                 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1569                 goto out_put;
1570         }
1571         char *target = xseg_get_target(peer->xseg, req);
1572         strncpy(target, map->volume, req->targetlen);
1573         req->op = X_DELETE;
1574         req->size = req->datalen;
1575         req->offset = 0;
1576         r = xseg_set_req_data(peer->xseg, req, pr);
1577         if (r < 0){
1578                 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1579                 goto out_put;
1580         }
1581         /* do not check return value. just make sure there is no node set */
1582         __set_copyup_node(mio, req, NULL);
1583         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1584         if (p == NoPort){
1585                 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1586                 goto out_unset;
1587         }
1588         r = xseg_signal(peer->xseg, p);
1589         map->flags |= MF_MAP_DELETING;
1590         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1591         return req;
1592
1593 out_unset:
1594         xseg_get_req_data(peer->xseg, req, &dummy);
1595 out_put:
1596         xseg_put_request(peer->xseg, req, pr->portno);
1597 out_err:
1598         XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1599         return  NULL;
1600 }
1601
1602
1603 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1604 {
1605         struct map_node *mn = find_object(map, index);
1606         if (mn)
1607                 mn->ref++;
1608         return mn;
1609 }
1610
1611 static inline void put_mapnode(struct map_node *mn)
1612 {
1613         mn->ref--;
1614         if (!mn->ref){
1615                 //clean up mn
1616                 st_cond_destroy(mn->cond);
1617         }
1618 }
1619
1620 static inline void __get_map(struct map *map)
1621 {
1622         map->ref++;
1623 }
1624
1625 static inline void put_map(struct map *map)
1626 {
1627         struct map_node *mn;
1628         map->ref--;
1629         if (!map->ref){
1630                 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1631                 //clean up map
1632                 uint64_t i;
1633                 for (i = 0; i < calc_map_obj(map); i++) {
1634                         mn = get_mapnode(map, i);
1635                         if (mn) {
1636                                 //make sure all pending operations on all objects are completed
1637                                 //this should never happen...
1638                                 if (mn->flags & MF_OBJECT_NOT_READY)
1639                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1640                                 mn->flags |= MF_OBJECT_DESTROYED;
1641                                 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1642                                 put_mapnode(mn); //matcing get_mapnode;
1643                                 //assert mn->ref == 0;
1644                         }
1645                 }
1646                 mn = find_object(map, 0);
1647                 if (mn)
1648                         free(mn);
1649                 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1650                 free(map);
1651         }
1652 }
1653
1654 static struct map * create_map(struct mapperd *mapper, char *name,
1655                                 uint32_t namelen, uint32_t flags)
1656 {
1657         int r;
1658         if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1659                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1660                                         namelen, MAX_VOLUME_LEN);
1661                 return NULL;
1662         }
1663         struct map *m = malloc(sizeof(struct map));
1664         if (!m){
1665                 XSEGLOG2(&lc, E, "Cannot allocate map ");
1666                 goto out_err;
1667         }
1668         m->size = -1;
1669         if (flags & MF_ARCHIP){
1670                 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1671                 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1672                 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1673                 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1674                 m->version = 1; /* keep this hardcoded for now */
1675         }
1676         else {
1677                 strncpy(m->volume, name, namelen);
1678                 m->volume[namelen] = 0;
1679                 m->volumelen = namelen;
1680                 m->version = 0; /* version 0 should be pithos maps */
1681         }
1682         m->flags = 0;
1683         m->objects = xhash_new(3, INTEGER); 
1684         if (!m->objects){
1685                 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1686                                 m->volume);
1687                 goto out_map;
1688         }
1689         m->ref = 1;
1690         m->waiters = 0;
1691         m->cond = st_cond_new(); //FIXME err check;
1692         r = insert_map(mapper, m);
1693         if (r < 0){
1694                 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1695                 goto out_hash;
1696         }
1697
1698         return m;
1699
1700 out_hash:
1701         xhash_free(m->objects);
1702 out_map:
1703         XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1704         free(m);
1705 out_err:
1706         return NULL;
1707 }
1708
1709
1710
1711 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1712 {
1713         struct peerd *peer = pr->peer;
1714         struct mapperd *mapper = __get_mapperd(peer);
1715         (void)mapper;
1716         struct mapper_io *mio = __get_mapper_io(pr);
1717         struct map_node *mn = __get_copyup_node(mio, req);
1718
1719         __set_copyup_node(mio, req, NULL);
1720
1721         //assert req->op = X_DELETE;
1722         //assert pr->req->op = X_DELETE only map deletions make delete requests
1723         //assert mio->del_pending > 0
1724         XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
1725         mio->del_pending--;
1726
1727         if (req->state & XS_FAILED){
1728                 mio->err = 1;
1729         }
1730         if (mn){
1731                 XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
1732                                 mn, mn->object, mio, req);
1733                 // assert mn->flags & MF_OBJECT_DELETING
1734                 mn->flags &= ~MF_OBJECT_DELETING;
1735                 mn->flags |= MF_OBJECT_DESTROYED;
1736                 signal_mapnode(mn);
1737                 /* put mapnode here, matches get_mapnode on do_destroy */
1738                 put_mapnode(mn);
1739         } else {
1740                 XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
1741                                 mio, req);
1742         }
1743         xseg_put_request(peer->xseg, req, pr->portno);
1744         signal_pr(pr);
1745 }
1746
1747 void snapshot_cb(struct peer_req *pr, struct xseg_request *req)
1748 {
1749         struct peerd *peer = pr->peer;
1750         struct mapperd *mapper = __get_mapperd(peer);
1751         (void)mapper;
1752         struct mapper_io *mio = __get_mapper_io(pr);
1753         struct map_node *mn = __get_copyup_node(mio, req);
1754         if (!mn){
1755                 XSEGLOG2(&lc, E, "Cannot get map node");
1756                 goto out_err;
1757         }
1758         __set_copyup_node(mio, req, NULL);
1759
1760         if (req->state & XS_FAILED){
1761                 if (req->op == X_DELETE){
1762                         XSEGLOG2(&lc, E, "Delete req failed");
1763                         goto out_ok;
1764                 }
1765                 XSEGLOG2(&lc, E, "Req failed");
1766                 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1767                 mn->flags &= ~MF_OBJECT_WRITING;
1768                 goto out_err;
1769         }
1770
1771         if (req->op == X_WRITE) {
1772                 char old_object_name[MAX_OBJECT_LEN + 1];
1773                 uint32_t old_objectlen;
1774
1775                 char *target = xseg_get_target(peer->xseg, req);
1776                 (void)target;
1777                 //assert mn->flags & MF_OBJECT_WRITING
1778                 mn->flags &= ~MF_OBJECT_WRITING;
1779                 strncpy(old_object_name, mn->object, mn->objectlen);
1780                 old_objectlen = mn->objectlen;
1781
1782                 struct map_node tmp;
1783                 char *data = xseg_get_data(peer->xseg, req);
1784                 map_to_object(&tmp, (unsigned char *) data);
1785                 mn->flags &= ~MF_OBJECT_EXIST;
1786
1787                 strncpy(mn->object, tmp.object, tmp.objectlen);
1788                 mn->object[tmp.objectlen] = 0;
1789                 mn->objectlen = tmp.objectlen;
1790                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1791                 //signal_mapnode since Snapshot was successfull
1792                 signal_mapnode(mn);
1793
1794                 //do delete old object
1795                 strncpy(tmp.object, old_object_name, old_objectlen);
1796                 tmp.object[old_objectlen] = 0;
1797                 tmp.objectlen = old_objectlen;
1798                 tmp.flags = MF_OBJECT_EXIST;
1799                 struct xseg_request *xreq = __delete_object(pr, &tmp);
1800                 if (!xreq){
1801                         //just a warning. Snapshot was successfull
1802                         XSEGLOG2(&lc, W, "Cannot delete old object %s", tmp.object);
1803                         goto out_ok;
1804                 }
1805                 //overwrite copyup node, since tmp is a stack dummy variable
1806                 __set_copyup_node (mio, xreq, mn);
1807                 XSEGLOG2(&lc, I, "Deletion of %s pending", tmp.object);
1808         } else if (req->op == X_SNAPSHOT) {
1809                 //issue write_object;
1810                 mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1811                 struct map *map = mn->map;
1812                 if (!map){
1813                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1814                         goto out_err;
1815                 }
1816
1817                 /* construct a tmp map_node for writing purposes */
1818                 //char *target = xseg_get_target(peer->xseg, req);
1819                 struct map_node newmn = *mn;
1820                 newmn.flags = 0;
1821                 struct xseg_reply_snapshot *xreply;
1822                 xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, req);
1823                 //assert xreply->targetlen !=0
1824                 //assert xreply->targetlen < XSEG_MAX_TARGETLEN
1825                 //xreply->target[xreply->targetlen] = 0;
1826                 //assert xreply->target valid
1827                 strncpy(newmn.object, xreply->target, xreply->targetlen);
1828                 newmn.object[req->targetlen] = 0;
1829                 newmn.objectlen = req->targetlen;
1830                 newmn.objectidx = mn->objectidx;
1831                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1832                 if (!xreq){
1833                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1834                                         "\n\t of map %s [%llu]",
1835                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1836                         goto out_err;
1837                 }
1838                 mn->flags |= MF_OBJECT_WRITING;
1839                 __set_copyup_node (mio, xreq, mn);
1840
1841                 XSEGLOG2(&lc, I, "Object %s snapshot completed. Pending writing.", mn->object);
1842         } else if (req->op == X_DELETE){
1843                 //deletion of the old block completed
1844                 XSEGLOG2(&lc, I, "Deletion of completed");
1845                 goto out_ok;
1846                 ;
1847         } else {
1848                 //wtf??
1849                 ;
1850         }
1851
1852 out:
1853         xseg_put_request(peer->xseg, req, pr->portno);
1854         return;
1855
1856 out_err:
1857         mio->snap_pending--;
1858         XSEGLOG2(&lc, D, "Mio->snap_pending: %u", mio->snap_pending);
1859         mio->err = 1;
1860         if (mn)
1861                 signal_mapnode(mn);
1862         signal_pr(pr);
1863         goto out;
1864
1865 out_ok:
1866         mio->snap_pending--;
1867         signal_pr(pr);
1868         goto out;
1869
1870
1871 }
1872 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1873 {
1874         struct peerd *peer = pr->peer;
1875         struct mapperd *mapper = __get_mapperd(peer);
1876         (void)mapper;
1877         struct mapper_io *mio = __get_mapper_io(pr);
1878         struct map_node *mn = __get_copyup_node(mio, req);
1879         if (!mn){
1880                 XSEGLOG2(&lc, E, "Cannot get map node");
1881                 goto out_err;
1882         }
1883         __set_copyup_node(mio, req, NULL);
1884
1885         if (req->state & XS_FAILED){
1886                 XSEGLOG2(&lc, E, "Req failed");
1887                 mn->flags &= ~MF_OBJECT_COPYING;
1888                 mn->flags &= ~MF_OBJECT_WRITING;
1889                 goto out_err;
1890         }
1891         if (req->op == X_WRITE) {
1892                 char *target = xseg_get_target(peer->xseg, req);
1893                 (void)target;
1894                 //printf("handle object write replyi\n");
1895                 __set_copyup_node(mio, req, NULL);
1896                 //assert mn->flags & MF_OBJECT_WRITING
1897                 mn->flags &= ~MF_OBJECT_WRITING;
1898
1899                 struct map_node tmp;
1900                 char *data = xseg_get_data(peer->xseg, req);
1901                 map_to_object(&tmp, (unsigned char *) data);
1902                 mn->flags |= MF_OBJECT_EXIST;
1903                 if (mn->flags != MF_OBJECT_EXIST){
1904                         XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1905                         goto out_err;
1906                 }
1907                 //assert mn->flags & MF_OBJECT_EXIST
1908                 strncpy(mn->object, tmp.object, tmp.objectlen);
1909                 mn->object[tmp.objectlen] = 0;
1910                 mn->objectlen = tmp.objectlen;
1911                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1912                 mio->copyups--;
1913                 signal_mapnode(mn);
1914                 signal_pr(pr);
1915         } else if (req->op == X_COPY) {
1916         //      issue write_object;
1917                 mn->flags &= ~MF_OBJECT_COPYING;
1918                 struct map *map = mn->map;
1919                 if (!map){
1920                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1921                         goto out_err;
1922                 }
1923
1924                 /* construct a tmp map_node for writing purposes */
1925                 char *target = xseg_get_target(peer->xseg, req);
1926                 struct map_node newmn = *mn;
1927                 newmn.flags = MF_OBJECT_EXIST;
1928                 strncpy(newmn.object, target, req->targetlen);
1929                 newmn.object[req->targetlen] = 0;
1930                 newmn.objectlen = req->targetlen;
1931                 newmn.objectidx = mn->objectidx; 
1932                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1933                 if (!xreq){
1934                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1935                                         "\n\t of map %s [%llu]",
1936                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1937                         goto out_err;
1938                 }
1939                 mn->flags |= MF_OBJECT_WRITING;
1940                 __set_copyup_node (mio, xreq, mn);
1941
1942                 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1943         } else {
1944                 //wtf??
1945                 ;
1946         }
1947
1948 out:
1949         xseg_put_request(peer->xseg, req, pr->portno);
1950         return;
1951
1952 out_err:
1953         mio->copyups--;
1954         XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1955         mio->err = 1;
1956         if (mn)
1957                 signal_mapnode(mn);
1958         signal_pr(pr);
1959         goto out;
1960
1961 }
1962
1963 struct r2o {
1964         struct map_node *mn;
1965         uint64_t offset;
1966         uint64_t size;
1967 };
1968
1969 static int req2objs(struct peer_req *pr, struct map *map, int write)
1970 {
1971         int r = 0;
1972         struct peerd *peer = pr->peer;
1973         struct mapper_io *mio = __get_mapper_io(pr);
1974         char *target = xseg_get_target(peer->xseg, pr->req);
1975         uint32_t nr_objs = calc_nr_obj(pr->req);
1976         uint64_t size = sizeof(struct xseg_reply_map) + 
1977                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1978         uint32_t idx, i;
1979         uint64_t rem_size, obj_index, obj_offset, obj_size; 
1980         struct map_node *mn;
1981         mio->copyups = 0;
1982         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1983
1984         /* get map_nodes of request */
1985         struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1986         if (!mns){
1987                 XSEGLOG2(&lc, E, "Cannot allocate mns");
1988                 return -1;
1989         }
1990         idx = 0;
1991         rem_size = pr->req->size;
1992         obj_index = pr->req->offset / block_size;
1993         obj_offset = pr->req->offset & (block_size -1); //modulo
1994         obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1995         mn = get_mapnode(map, obj_index);
1996         if (!mn) {
1997                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1998                 r = -1;
1999                 goto out;
2000         }
2001         mns[idx].mn = mn;
2002         mns[idx].offset = obj_offset;
2003         mns[idx].size = obj_size;
2004         rem_size -= obj_size;
2005         while (rem_size > 0) {
2006                 idx++;
2007                 obj_index++;
2008                 obj_offset = 0;
2009                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
2010                 rem_size -= obj_size;
2011                 mn = get_mapnode(map, obj_index);
2012                 if (!mn) {
2013                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
2014                         r = -1;
2015                         goto out;
2016                 }
2017                 mns[idx].mn = mn;
2018                 mns[idx].offset = obj_offset;
2019                 mns[idx].size = obj_size;
2020         }
2021         if (write) {
2022                 int can_wait = 0;
2023                 mio->cb=copyup_cb;
2024                 /* do a first scan and issue as many copyups as we can.
2025                  * then retry and wait when an object is not ready.
2026                  * this could be done better, since now we wait also on the
2027                  * pending copyups
2028                  */
2029                 int j;
2030                 for (j = 0; j < 2 && !mio->err; j++) {
2031                         for (i = 0; i < (idx+1); i++) {
2032                                 mn = mns[i].mn;
2033                                 //do copyups
2034                                 if (mn->flags & MF_OBJECT_NOT_READY){
2035                                         if (!can_wait)
2036                                                 continue;
2037                                         if (mn->flags & MF_OBJECT_NOT_READY)
2038                                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2039                                         if (mn->flags & MF_OBJECT_DESTROYED){
2040                                                 mio->err = 1;
2041                                                 continue;
2042                                         }
2043                                 }
2044
2045                                 if (!(mn->flags & MF_OBJECT_EXIST)) {
2046                                         //calc new_target, copy up object
2047                                         if (copyup_object(peer, mn, pr) == NULL){
2048                                                 XSEGLOG2(&lc, E, "Error in copy up object");
2049                                                 mio->err = 1;
2050                                         } else {
2051                                                 mio->copyups++;
2052                                         }
2053                                 }
2054
2055                                 if (mio->err){
2056                                         XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
2057                                         break;
2058                                 }
2059                         }
2060                         can_wait = 1;
2061                 }
2062                 if (mio->copyups > 0)
2063                         wait_on_pr(pr, mio->copyups > 0);
2064         }
2065
2066         if (mio->err){
2067                 r = -1;
2068                 XSEGLOG2(&lc, E, "Mio->err");
2069                 goto out;
2070         }
2071
2072         /* resize request to fit reply */
2073         char buf[XSEG_MAX_TARGETLEN];
2074         strncpy(buf, target, pr->req->targetlen);
2075         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
2076         if (r < 0) {
2077                 XSEGLOG2(&lc, E, "Cannot resize request");
2078                 goto out;
2079         }
2080         target = xseg_get_target(peer->xseg, pr->req);
2081         strncpy(target, buf, pr->req->targetlen);
2082
2083         /* structure reply */
2084         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2085         reply->cnt = nr_objs;
2086         for (i = 0; i < (idx+1); i++) {
2087                 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
2088                 reply->segs[i].targetlen = mns[i].mn->objectlen;
2089                 reply->segs[i].offset = mns[i].offset;
2090                 reply->segs[i].size = mns[i].size;
2091         }
2092 out:
2093         for (i = 0; i < idx; i++) {
2094                 put_mapnode(mns[i].mn);
2095         }
2096         free(mns);
2097         mio->cb = NULL;
2098         return r;
2099 }
2100
2101 static int do_dropcache(struct peer_req *pr, struct map *map)
2102 {
2103         struct map_node *mn;
2104         struct peerd *peer = pr->peer;
2105         struct mapperd *mapper = __get_mapperd(peer);
2106         uint64_t i;
2107         XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
2108         map->flags |= MF_MAP_DROPPING_CACHE;
2109         for (i = 0; i < calc_map_obj(map); i++) {
2110                 mn = get_mapnode(map, i);
2111                 if (mn) {
2112                         if (!(mn->flags & MF_OBJECT_DESTROYED)){
2113                                 //make sure all pending operations on all objects are completed
2114                                 if (mn->flags & MF_OBJECT_NOT_READY)
2115                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2116                                 mn->flags |= MF_OBJECT_DESTROYED;
2117                         }
2118                         put_mapnode(mn);
2119                 }
2120         }
2121         map->flags &= ~MF_MAP_DROPPING_CACHE;
2122         map->flags |= MF_MAP_DESTROYED;
2123         remove_map(mapper, map);
2124         XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
2125         put_map(map);   // put map here to destroy it (matches m->ref = 1 on map create)
2126         return 0;
2127 }
2128
2129 static int do_info(struct peer_req *pr, struct map *map)
2130 {
2131         struct peerd *peer = pr->peer;
2132         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
2133         xinfo->size = map->size;
2134         return 0;
2135 }
2136
2137
2138 static int do_open(struct peer_req *pr, struct map *map)
2139 {
2140         if (map->flags & MF_MAP_EXCLUSIVE){
2141                 return 0;
2142         }
2143         else {
2144                 return -1;
2145         }
2146 }
2147
2148 static int do_close(struct peer_req *pr, struct map *map)
2149 {
2150         if (map->flags & MF_MAP_EXCLUSIVE){
2151                 /* do not drop cache if close failed and map not deleted */
2152                 if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
2153                         return -1;
2154         }
2155         return do_dropcache(pr, map);
2156 }
2157
2158 static int do_snapshot(struct peer_req *pr, struct map *map)
2159 {
2160         uint64_t i;
2161         struct peerd *peer = pr->peer;
2162         struct mapper_io *mio = __get_mapper_io(pr);
2163         struct map_node *mn;
2164         struct xseg_request *req;
2165
2166         if (!(map->flags & MF_MAP_EXCLUSIVE)){
2167                 XSEGLOG2(&lc, E, "Map was not opened exclusively");
2168                 return -1;
2169         }
2170         XSEGLOG2(&lc, I, "Starting snapshot for map %s", map->volume);
2171         map->flags |= MF_MAP_SNAPSHOTTING;
2172
2173         uint64_t nr_obj = calc_map_obj(map);
2174         mio->cb = snapshot_cb;
2175         mio->snap_pending = 0;
2176         mio->err = 0;
2177         for (i = 0; i < nr_obj; i++){
2178
2179                 /* throttle pending snapshots
2180                  * this should be nr_ops of the blocker, but since we don't know
2181                  * that, we assume based on our own nr_ops
2182                  */
2183                 if (mio->snap_pending >= peer->nr_ops)
2184                         wait_on_pr(pr, mio->snap_pending >= peer->nr_ops);
2185
2186                 mn = get_mapnode(map, i);
2187                 if (!mn)
2188                         //warning?
2189                         continue;
2190                 if (!(mn->flags & MF_OBJECT_EXIST)){
2191                         put_mapnode(mn);
2192                         continue;
2193                 }
2194                 // make sure all pending operations on all objects are completed
2195                 if (mn->flags & MF_OBJECT_NOT_READY)
2196                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2197
2198                 /* TODO will this ever happen?? */
2199                 if (mn->flags & MF_OBJECT_DESTROYED){
2200                         put_mapnode(mn);
2201                         continue;
2202                 }
2203
2204                 req = __snapshot_object(pr, mn);
2205                 if (!req){
2206                         mio->err = 1;
2207                         put_mapnode(mn);
2208                         break;
2209                 }
2210                 mio->snap_pending++;
2211                 /* do not put_mapnode here. cb does that */
2212         }
2213
2214         if (mio->snap_pending > 0)
2215                 wait_on_pr(pr, mio->snap_pending > 0);
2216         mio->cb = NULL;
2217
2218         if (mio->err)
2219                 goto out_err;
2220
2221         /* calculate name of snapshot */
2222         struct map tmp_map = *map;
2223         unsigned char sha[SHA256_DIGEST_SIZE];
2224         unsigned char *buf = malloc(block_size);
2225         char newvolumename[MAX_VOLUME_LEN];
2226         uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
2227         uint64_t pos = 0;
2228         uint64_t max_objidx = calc_map_obj(map);
2229         int r;
2230
2231         for (i = 0; i < max_objidx; i++) {
2232                 mn = find_object(map, i);
2233                 if (!mn){
2234                         XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
2235                                         (unsigned long long) i, map->volume);
2236                         goto out_err;
2237                 }
2238                 v0_object_to_map(mn, buf+pos);
2239                 pos += v0_objectsize_in_map;
2240         }
2241 //      SHA256(buf, pos, sha);
2242         merkle_hash(buf, pos, sha);
2243         hexlify(sha, newvolumename);
2244         strncpy(tmp_map.volume, newvolumename, newvolumenamelen);
2245         tmp_map.volumelen = newvolumenamelen;
2246         free(buf);
2247         tmp_map.version = 0; // set volume version to pithos image
2248
2249         /* write the map of the Snapshot */
2250         r = write_map(pr, &tmp_map);
2251         if (r < 0)
2252                 goto out_err;
2253         char targetbuf[XSEG_MAX_TARGETLEN];
2254         char *target = xseg_get_target(peer->xseg, pr->req);
2255         strncpy(targetbuf, target, pr->req->targetlen);
2256         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
2257                         sizeof(struct xseg_reply_snapshot));
2258         if (r < 0){
2259                 XSEGLOG2(&lc, E, "Cannot resize request");
2260                 goto out_err;
2261         }
2262         target = xseg_get_target(peer->xseg, pr->req);
2263         strncpy(target, targetbuf, pr->req->targetlen);
2264
2265         struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *)
2266                                                 xseg_get_data(peer->xseg, pr->req);
2267         strncpy(xreply->target, newvolumename, newvolumenamelen);
2268         xreply->targetlen = newvolumenamelen;
2269         map->flags &= ~MF_MAP_SNAPSHOTTING;
2270         XSEGLOG2(&lc, I, "Snapshot for map %s completed", map->volume);
2271         return 0;
2272
2273 out_err:
2274         map->flags &= ~MF_MAP_SNAPSHOTTING;
2275         XSEGLOG2(&lc, E, "Snapshot for map %s failed", map->volume);
2276         return -1;
2277 }
2278
2279
2280 static int do_destroy(struct peer_req *pr, struct map *map)
2281 {
2282         uint64_t i;
2283         struct peerd *peer = pr->peer;
2284         struct mapper_io *mio = __get_mapper_io(pr);
2285         struct map_node *mn;
2286         struct xseg_request *req;
2287
2288         if (!(map->flags & MF_MAP_EXCLUSIVE))
2289                 return -1;
2290
2291         XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
2292         req = __delete_map(pr, map);
2293         if (!req)
2294                 return -1;
2295         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
2296         if (req->state & XS_FAILED){
2297                 xseg_put_request(peer->xseg, req, pr->portno);
2298                 map->flags &= ~MF_MAP_DELETING;
2299                 return -1;
2300         }
2301         xseg_put_request(peer->xseg, req, pr->portno);
2302
2303         uint64_t nr_obj = calc_map_obj(map);
2304         mio->cb = deletion_cb;
2305         mio->del_pending = 0;
2306         mio->err = 0;
2307         for (i = 0; i < nr_obj; i++){
2308
2309                 /* throttle pending deletions
2310                  * this should be nr_ops of the blocker, but since we don't know
2311                  * that, we assume based on our own nr_ops
2312                  */
2313                 if (mio->del_pending >= peer->nr_ops)
2314                         wait_on_pr(pr, mio->del_pending >= peer->nr_ops);
2315
2316                 mn = get_mapnode(map, i);
2317                 if (!mn)
2318                         continue;
2319                 if (mn->flags & MF_OBJECT_DESTROYED){
2320                         put_mapnode(mn);
2321                         continue;
2322                 }
2323                 if (!(mn->flags & MF_OBJECT_EXIST)){
2324                         mn->flags |= MF_OBJECT_DESTROYED;
2325                         put_mapnode(mn);
2326                         continue;
2327                 }
2328
2329                 // make sure all pending operations on all objects are completed
2330                 if (mn->flags & MF_OBJECT_NOT_READY)
2331                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2332
2333                 req = __delete_object(pr, mn);
2334                 if (!req){
2335                         mio->err = 1;
2336                         put_mapnode(mn);
2337                         continue;
2338                 }
2339                 mio->del_pending++;
2340                 /* do not put_mapnode here. cb does that */
2341         }
2342
2343         if (mio->del_pending > 0)
2344                 wait_on_pr(pr, mio->del_pending > 0);
2345
2346         mio->cb = NULL;
2347         map->flags &= ~MF_MAP_DELETING;
2348         map->flags |= MF_MAP_DELETED;
2349         XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
2350         return do_close(pr, map);
2351 }
2352
2353 static int do_mapr(struct peer_req *pr, struct map *map)
2354 {
2355         struct peerd *peer = pr->peer;
2356         int r = req2objs(pr, map, 0);
2357         if  (r < 0){
2358                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
2359                                 map->volume, 
2360                                 (unsigned long long) pr->req->offset, 
2361                                 (unsigned long long) (pr->req->offset + pr->req->size));
2362                 return -1;
2363         }
2364         XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
2365                         map->volume, 
2366                         (unsigned long long) pr->req->offset, 
2367                         (unsigned long long) (pr->req->offset + pr->req->size));
2368         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2369                         (unsigned long long) pr->req->offset,
2370                         (unsigned long long) pr->req->size);
2371         char buf[XSEG_MAX_TARGETLEN+1];
2372         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2373         int i;
2374         for (i = 0; i < reply->cnt; i++) {
2375                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2376                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2377                 buf[reply->segs[i].targetlen] = 0;
2378                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2379                                 (unsigned long long) reply->segs[i].offset,
2380                                 (unsigned long long) reply->segs[i].size);
2381         }
2382         return 0;
2383 }
2384
2385 static int do_mapw(struct peer_req *pr, struct map *map)
2386 {
2387         struct peerd *peer = pr->peer;
2388         int r = req2objs(pr, map, 1);
2389         if  (r < 0){
2390                 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
2391                                 map->volume, 
2392                                 (unsigned long long) pr->req->offset, 
2393                                 (unsigned long long) (pr->req->offset + pr->req->size));
2394                 return -1;
2395         }
2396         XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
2397                         map->volume, 
2398                         (unsigned long long) pr->req->offset, 
2399                         (unsigned long long) (pr->req->offset + pr->req->size));
2400         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2401                         (unsigned long long) pr->req->offset,
2402                         (unsigned long long) pr->req->size);
2403         char buf[XSEG_MAX_TARGETLEN+1];
2404         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2405         int i;
2406         for (i = 0; i < reply->cnt; i++) {
2407                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2408                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2409                 buf[reply->segs[i].targetlen] = 0;
2410                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2411                                 (unsigned long long) reply->segs[i].offset,
2412                                 (unsigned long long) reply->segs[i].size);
2413         }
2414         return 0;
2415 }
2416
2417 //here map is the parent map
2418 static int do_clone(struct peer_req *pr, struct map *map)
2419 {
2420         int r;
2421         struct peerd *peer = pr->peer;
2422         struct mapperd *mapper = __get_mapperd(peer);
2423         char *target = xseg_get_target(peer->xseg, pr->req);
2424         struct map *clonemap;
2425         struct xseg_request_clone *xclone =
2426                 (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2427
2428         XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
2429
2430         clonemap = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2431         if (!clonemap)
2432                 return -1;
2433
2434         /* open map to get exclusive access to map */
2435         r = open_map(pr, clonemap, 0);
2436         if (r < 0){
2437                 XSEGLOG2(&lc, E, "Cannot open map %s", clonemap->volume);
2438                 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2439                 goto out_err;
2440         }
2441         r = load_map(pr, clonemap);
2442         if (r >= 0) {
2443                 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2444                 goto out_err;
2445         }
2446
2447         if (xclone->size == -1)
2448                 clonemap->size = map->size;
2449         else
2450                 clonemap->size = xclone->size;
2451         if (clonemap->size < map->size){
2452                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
2453                                 "\n\t for requested clone %s",
2454                                 (unsigned long long) xclone->size,
2455                                 (unsigned long long) map->size, clonemap->volume);
2456                 goto out_err;
2457         }
2458         if (clonemap->size > MAX_VOLUME_SIZE) {
2459                 XSEGLOG2(&lc, E, "Requested size %llu > max volume size %llu"
2460                                 "\n\t for volume %s",
2461                                 clonemap->size, MAX_VOLUME_SIZE, clonemap->volume);
2462                 goto out_err;
2463         }
2464
2465         //alloc and init map_nodes
2466         //unsigned long c = clonemap->size/block_size + 1;
2467         unsigned long c = calc_map_obj(clonemap);
2468         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
2469         if (!map_nodes){
2470                 goto out_err;
2471         }
2472         int i;
2473         //for (i = 0; i < clonemap->size/block_size + 1; i++) {
2474         for (i = 0; i < c; i++) {
2475                 struct map_node *mn = get_mapnode(map, i);
2476                 if (mn) {
2477                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
2478                         map_nodes[i].objectlen = mn->objectlen;
2479                         put_mapnode(mn);
2480                 } else {
2481                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2482                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2483                 }
2484                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2485                 map_nodes[i].flags = 0;
2486                 map_nodes[i].objectidx = i;
2487                 map_nodes[i].map = clonemap;
2488                 map_nodes[i].ref = 1;
2489                 map_nodes[i].waiters = 0;
2490                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2491                 r = insert_object(clonemap, &map_nodes[i]);
2492                 if (r < 0){
2493                         XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
2494                         goto out_err;
2495                 }
2496         }
2497
2498         r = write_map(pr, clonemap);
2499         if (r < 0){
2500                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
2501                 goto out_err;
2502         }
2503         do_close(pr, clonemap);
2504         return 0;
2505
2506 out_err:
2507         do_close(pr, clonemap);
2508         return -1;
2509 }
2510
2511 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
2512 {
2513         int r, opened = 0;
2514         if (flags & MF_EXCLUSIVE){
2515                 r = open_map(pr, map, flags);
2516                 if (r < 0) {
2517                         if (flags & MF_FORCE){
2518                                 return -1;
2519                         }
2520                 } else {
2521                         opened = 1;
2522                 }
2523         }
2524         r = load_map(pr, map);
2525         if (r < 0 && opened){
2526                 close_map(pr, map);
2527         }
2528         return r;
2529 }
2530
2531 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
2532                         uint32_t flags)
2533 {
2534         int r;
2535         struct peerd *peer = pr->peer;
2536         struct mapperd *mapper = __get_mapperd(peer);
2537         struct map *map = find_map_len(mapper, name, namelen, flags);
2538         if (!map){
2539                 if (flags & MF_LOAD){
2540                         map = create_map(mapper, name, namelen, flags);
2541                         if (!map)
2542                                 return NULL;
2543                         r = open_load_map(pr, map, flags);
2544                         if (r < 0){
2545                                 do_dropcache(pr, map);
2546                                 return NULL;
2547                         }
2548                 } else {
2549                         return NULL;
2550                 }
2551         } else if (map->flags & MF_MAP_DESTROYED){
2552                 return NULL;
2553         }
2554         __get_map(map);
2555         return map;
2556
2557 }
2558
2559 static int map_action(int (action)(struct peer_req *pr, struct map *map),
2560                 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
2561 {
2562         //struct peerd *peer = pr->peer;
2563         struct map *map;
2564 start:
2565         map = get_map(pr, name, namelen, flags);
2566         if (!map)
2567                 return -1;
2568         if (map->flags & MF_MAP_NOT_READY){
2569                 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
2570                 put_map(map);
2571                 goto start;
2572         }
2573         int r = action(pr, map);
2574         //always drop cache if map not read exclusively
2575         if (!(map->flags & MF_MAP_EXCLUSIVE))
2576                 do_dropcache(pr, map);
2577         signal_map(map);
2578         put_map(map);
2579         return r;
2580 }
2581
2582 void * handle_info(struct peer_req *pr)
2583 {
2584         struct peerd *peer = pr->peer;
2585         char *target = xseg_get_target(peer->xseg, pr->req);
2586         int r = map_action(do_info, pr, target, pr->req->targetlen,
2587                                 MF_ARCHIP|MF_LOAD);
2588         if (r < 0)
2589                 fail(peer, pr);
2590         else
2591                 complete(peer, pr);
2592         ta--;
2593         return NULL;
2594 }
2595
2596 void * handle_clone(struct peer_req *pr)
2597 {
2598         int r;
2599         struct peerd *peer = pr->peer;
2600         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2601         if (!xclone) {
2602                 r = -1;
2603                 goto out;
2604         }
2605
2606         if (xclone->targetlen){
2607                 /* if snap was defined */
2608                 //support clone only from pithos
2609                 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
2610                                         MF_LOAD);
2611         } else {
2612                 /* else try to create a new volume */
2613                 XSEGLOG2(&lc, I, "Creating volume");
2614                 if (!xclone->size){
2615                         XSEGLOG2(&lc, E, "Cannot create volume. Size not specified");
2616                         r = -1;
2617                         goto out;
2618                 }
2619                 if (xclone->size > MAX_VOLUME_SIZE) {
2620                         XSEGLOG2(&lc, E, "Requested size %llu > max volume "
2621                                         "size %llu", xclone->size, MAX_VOLUME_SIZE);
2622                         r = -1;
2623                         goto out;
2624                 }
2625
2626                 struct map *map;
2627                 char *target = xseg_get_target(peer->xseg, pr->req);
2628
2629                 //create a new empty map of size
2630                 map = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2631                 if (!map){
2632                         r = -1;
2633                         goto out;
2634                 }
2635                 /* open map to get exclusive access to map */
2636                 r = open_map(pr, map, 0);
2637                 if (r < 0){
2638                         XSEGLOG2(&lc, E, "Cannot open map %s", map->volume);
2639                         XSEGLOG2(&lc, E, "Target volume %s exists", map->volume);
2640                         do_dropcache(pr, map);
2641                         r = -1;
2642                         goto out;
2643                 }
2644                 r = load_map(pr, map);
2645                 if (r >= 0) {
2646                         XSEGLOG2(&lc, E, "Map exists %s", map->volume);
2647                         do_close(pr, map);
2648                         r = -1;
2649                         goto out;
2650                 }
2651                 map->size = xclone->size;
2652                 //populate_map with zero objects;
2653                 uint64_t nr_objs = xclone->size / block_size;
2654                 if (xclone->size % block_size)
2655                         nr_objs++;
2656
2657                 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
2658                 if (!map_nodes){
2659                         do_close(pr, map);
2660                         r = -1;
2661                         goto out;
2662                 }
2663
2664                 uint64_t i;
2665                 for (i = 0; i < nr_objs; i++) {
2666                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2667                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2668                         map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2669                         map_nodes[i].flags = 0;
2670                         map_nodes[i].objectidx = i;
2671                         map_nodes[i].map = map;
2672                         map_nodes[i].ref = 1;
2673                         map_nodes[i].waiters = 0;
2674                         map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2675                         r = insert_object(map, &map_nodes[i]);
2676                         if (r < 0){
2677                                 do_close(pr, map);
2678                                 r = -1;
2679                                 goto out;
2680                         }
2681                 }
2682                 r = write_map(pr, map);
2683                 if (r < 0){
2684                         XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
2685                         do_close(pr, map);
2686                         goto out;
2687                 }
2688                 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
2689                 r = 0;
2690                 do_close(pr, map); //drop cache here for consistency
2691         }
2692 out:
2693         if (r < 0)
2694                 fail(peer, pr);
2695         else
2696                 complete(peer, pr);
2697         ta--;
2698         return NULL;
2699 }
2700
2701 void * handle_mapr(struct peer_req *pr)
2702 {
2703         struct peerd *peer = pr->peer;
2704         char *target = xseg_get_target(peer->xseg, pr->req);
2705         int r = map_action(do_mapr, pr, target, pr->req->targetlen,
2706                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2707         if (r < 0)
2708                 fail(peer, pr);
2709         else
2710                 complete(peer, pr);
2711         ta--;
2712         return NULL;
2713 }
2714
2715 void * handle_mapw(struct peer_req *pr)
2716 {
2717         struct peerd *peer = pr->peer;
2718         char *target = xseg_get_target(peer->xseg, pr->req);
2719         int r = map_action(do_mapw, pr, target, pr->req->targetlen,
2720                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2721         if (r < 0)
2722                 fail(peer, pr);
2723         else
2724                 complete(peer, pr);
2725         XSEGLOG2(&lc, D, "Ta: %d", ta);
2726         ta--;
2727         return NULL;
2728 }
2729
2730 void * handle_destroy(struct peer_req *pr)
2731 {
2732         struct peerd *peer = pr->peer;
2733         char *target = xseg_get_target(peer->xseg, pr->req);
2734         /* request EXCLUSIVE access, but do not force it.
2735          * check if succeeded on do_destroy
2736          */
2737         int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2738                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2739         if (r < 0)
2740                 fail(peer, pr);
2741         else
2742                 complete(peer, pr);
2743         ta--;
2744         return NULL;
2745 }
2746
2747 void * handle_open(struct peer_req *pr)
2748 {
2749         struct peerd *peer = pr->peer;
2750         char *target = xseg_get_target(peer->xseg, pr->req);
2751         //here we do not want to load
2752         int r = map_action(do_open, pr, target, pr->req->targetlen,
2753                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2754         if (r < 0)
2755                 fail(peer, pr);
2756         else
2757                 complete(peer, pr);
2758         ta--;
2759         return NULL;
2760 }
2761
2762 void * handle_close(struct peer_req *pr)
2763 {
2764         struct peerd *peer = pr->peer;
2765         char *target = xseg_get_target(peer->xseg, pr->req);
2766         //here we do not want to load
2767         int r = map_action(do_close, pr, target, pr->req->targetlen,
2768                                 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2769         if (r < 0)
2770                 fail(peer, pr);
2771         else
2772                 complete(peer, pr);
2773         ta--;
2774         return NULL;
2775 }
2776
2777 void * handle_snapshot(struct peer_req *pr)
2778 {
2779         struct peerd *peer = pr->peer;
2780         char *target = xseg_get_target(peer->xseg, pr->req);
2781         /* request EXCLUSIVE access, but do not force it.
2782          * check if succeeded on do_snapshot
2783          */
2784         int r = map_action(do_snapshot, pr, target, pr->req->targetlen,
2785                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2786         if (r < 0)
2787                 fail(peer, pr);
2788         else
2789                 complete(peer, pr);
2790         ta--;
2791         return NULL;
2792 }
2793
2794 int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
2795                         struct xseg_request *req)
2796 {
2797         //struct mapperd *mapper = __get_mapperd(peer);
2798         struct mapper_io *mio = __get_mapper_io(pr);
2799         void *(*action)(struct peer_req *) = NULL;
2800
2801         mio->state = ACCEPTED;
2802         mio->err = 0;
2803         mio->cb = NULL;
2804         switch (pr->req->op) {
2805                 /* primary xseg operations of mapper */
2806                 case X_CLONE: action = handle_clone; break;
2807                 case X_MAPR: action = handle_mapr; break;
2808                 case X_MAPW: action = handle_mapw; break;
2809                 case X_SNAPSHOT: action = handle_snapshot; break;
2810                 case X_INFO: action = handle_info; break;
2811                 case X_DELETE: action = handle_destroy; break;
2812                 case X_OPEN: action = handle_open; break;
2813                 case X_CLOSE: action = handle_close; break;
2814                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2815         }
2816         if (action){
2817                 ta++;
2818                 mio->active = 1;
2819                 st_thread_create(action, pr, 0, 0);
2820         }
2821         return 0;
2822
2823 }
2824
2825 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2826                 enum dispatch_reason reason)
2827 {
2828         struct mapperd *mapper = __get_mapperd(peer);
2829         (void) mapper;
2830         struct mapper_io *mio = __get_mapper_io(pr);
2831         (void) mio;
2832
2833
2834         if (reason == dispatch_accept)
2835                 dispatch_accepted(peer, pr, req);
2836         else {
2837                 if (mio->cb){
2838                         mio->cb(pr, req);
2839                 } else { 
2840                         signal_pr(pr);
2841                 }
2842         }
2843         return 0;
2844 }
2845
2846 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2847 {
2848         int i;
2849
2850         //FIXME error checks
2851         struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2852         peer->priv = mapperd;
2853         mapper = mapperd;
2854         mapper->hashmaps = xhash_new(3, STRING);
2855
2856         for (i = 0; i < peer->nr_ops; i++) {
2857                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2858                 mio->copyups_nodes = xhash_new(3, INTEGER);
2859                 mio->copyups = 0;
2860                 mio->err = 0;
2861                 mio->active = 0;
2862                 peer->peer_reqs[i].priv = mio;
2863         }
2864
2865         mapper->bportno = -1;
2866         mapper->mbportno = -1;
2867         BEGIN_READ_ARGS(argc, argv);
2868         READ_ARG_ULONG("-bp", mapper->bportno);
2869         READ_ARG_ULONG("-mbp", mapper->mbportno);
2870         END_READ_ARGS();
2871         if (mapper->bportno == -1){
2872                 XSEGLOG2(&lc, E, "Portno for blocker must be provided");
2873                 usage(argv[0]);
2874                 return -1;
2875         }
2876         if (mapper->mbportno == -1){
2877                 XSEGLOG2(&lc, E, "Portno for mblocker must be provided");
2878                 usage(argv[0]);
2879                 return -1;
2880         }
2881
2882         const struct sched_param param = { .sched_priority = 99 };
2883         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
2884         /* FIXME maybe place it in peer
2885          * should be done for each port (sportno to eportno)
2886          */
2887         xseg_set_max_requests(peer->xseg, peer->portno_start, 5000);
2888         xseg_set_freequeue_size(peer->xseg, peer->portno_start, 3000, 0);
2889
2890
2891 //      test_map(peer);
2892
2893         return 0;
2894 }
2895
2896 /* FIXME this should not be here */
2897 int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
2898 {
2899         struct xseg *xseg = peer->xseg;
2900         xport portno_start = peer->portno_start;
2901         xport portno_end = peer->portno_end;
2902         struct peer_req *pr;
2903         xport i;
2904         int  r, c = 0;
2905         struct xseg_request *received;
2906         xseg_prepare_wait(xseg, portno_start);
2907         while(1) {
2908                 XSEGLOG2(&lc, D, "Attempting to check for reply");
2909                 c = 1;
2910                 while (c){
2911                         c = 0;
2912                         for (i = portno_start; i <= portno_end; i++) {
2913                                 received = xseg_receive(xseg, i, 0);
2914                                 if (received) {
2915                                         c = 1;
2916                                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
2917                                         if (r < 0 || !pr || received != expected_req){
2918                                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
2919                                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
2920                                                 if (p == NoPort){
2921                                                         XSEGLOG2(&lc, W, "Could not respond stale request");
2922                                                         xseg_put_request(xseg, received, portno_start);
2923                                                         continue;
2924                                                 } else {
2925                                                         xseg_signal(xseg, p);
2926                                                 }
2927                                         } else {
2928                                                 xseg_cancel_wait(xseg, portno_start);
2929                                                 return 0;
2930                                         }
2931                                 }
2932                         }
2933                 }
2934                 xseg_wait_signal(xseg, 1000000UL);
2935         }
2936 }
2937
2938
2939 void custom_peer_finalize(struct peerd *peer)
2940 {
2941         struct mapperd *mapper = __get_mapperd(peer);
2942         struct peer_req *pr = alloc_peer_req(peer);
2943         if (!pr){
2944                 XSEGLOG2(&lc, E, "Cannot get peer request");
2945                 return;
2946         }
2947         struct map *map;
2948         struct xseg_request *req;
2949         xhash_iter_t it;
2950         xhashidx key, val;
2951         xhash_iter_init(mapper->hashmaps, &it);
2952         while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
2953                 map = (struct map *)val;
2954                 if (!(map->flags & MF_MAP_EXCLUSIVE))
2955                         continue;
2956                 req = __close_map(pr, map);
2957                 if (!req)
2958                         continue;
2959                 wait_reply(peer, req);
2960                 if (!(req->state & XS_SERVED))
2961                         XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
2962                 map->flags &= ~MF_MAP_CLOSING;
2963                 xseg_put_request(peer->xseg, req, pr->portno);
2964         }
2965         return;
2966
2967
2968 }
2969
2970 void print_obj(struct map_node *mn)
2971 {
2972         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2973                         (unsigned long long) mn->objectidx, mn->object, 
2974                         (unsigned int) mn->objectlen, 
2975                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2976 }
2977
2978 void print_map(struct map *m)
2979 {
2980         uint64_t nr_objs = m->size/block_size;
2981         if (m->size % block_size)
2982                 nr_objs++;
2983         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n", 
2984                         m->volume, m->volumelen, 
2985                         (unsigned long long) m->size, 
2986                         (unsigned long long) nr_objs,
2987                         m->version);
2988         uint64_t i;
2989         struct map_node *mn;
2990         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2991                 return;
2992         for (i = 0; i < nr_objs; i++) {
2993                 mn = find_object(m, i);
2994                 if (!mn){
2995                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
2996                         continue;
2997                 }
2998                 print_obj(mn);
2999         }
3000 }
3001
3002 /*
3003 void test_map(struct peerd *peer)
3004 {
3005         int i,j, ret;
3006         //struct sha256_ctx sha256ctx;
3007         unsigned char buf[SHA256_DIGEST_SIZE];
3008         char buf_new[XSEG_MAX_TARGETLEN + 20];
3009         struct map *m = malloc(sizeof(struct map));
3010         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
3011         m->volume[XSEG_MAX_TARGETLEN] = 0;
3012         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
3013         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
3014         m->volumelen = XSEG_MAX_TARGETLEN;
3015         m->size = 100*block_size;
3016         m->objects = xhash_new(3, INTEGER);
3017         struct map_node *map_node = calloc(100, sizeof(struct map_node));
3018         for (i = 0; i < 100; i++) {
3019                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
3020                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
3021                 
3022                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
3023                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
3024                 }
3025                 map_node[i].objectidx = i;
3026                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
3027                 map_node[i].flags = MF_OBJECT_EXIST;
3028                 ret = insert_object(m, &map_node[i]);
3029         }
3030
3031         char *data = malloc(block_size);
3032         mapheader_to_map(m, data);
3033         uint64_t pos = mapheader_size;
3034
3035         for (i = 0; i < 100; i++) {
3036                 map_node = find_object(m, i);
3037                 if (!map_node){
3038                         printf("no object node %d \n", i);
3039                         exit(1);
3040                 }
3041                 object_to_map(data+pos, map_node);
3042                 pos += objectsize_in_map;
3043         }
3044 //      print_map(m);
3045
3046         struct map *m2 = malloc(sizeof(struct map));
3047         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
3048         m->volume[XSEG_MAX_TARGETLEN] = 0;
3049         m->volumelen = XSEG_MAX_TARGETLEN;
3050
3051         m2->objects = xhash_new(3, INTEGER);
3052         ret = read_map(peer, m2, data);
3053 //      print_map(m2);
3054
3055         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
3056         ssize_t r, sum = 0;
3057         while (sum < block_size) {
3058                 r = write(fd, data + sum, block_size -sum);
3059                 if (r < 0){
3060                         perror("write");
3061                         printf("write error\n");
3062                         exit(1);
3063                 } 
3064                 sum += r;
3065         }
3066         close(fd);
3067         map_node = find_object(m, 0);
3068         free(map_node);
3069         free(m);
3070 }
3071 */