e48dd8964a40c86084c77c5d58f5bc1dd64571a1
[archipelago] / xseg / peers / user / mt-mapperd.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <pthread.h>
5 #include <xseg/xseg.h>
6 #include <mpeer.h>
7 #include <time.h>
8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <gcrypt.h>
14 #include <errno.h>
15
16 GCRY_THREAD_OPTION_PTHREAD_IMPL;
17
18 #define MF_PENDING 1
19
20 #define SHA256_DIGEST_SIZE 32
21 /* hex representation of sha256 value takes up double the sha256 size */
22 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
23
24 #define XSEG_MAX_TARGETLEN (SHA256_DIGEST_SIZE << 1)
25
26 #define block_size (1<<22) //FIXME this should be defined here?
27 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
28 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value  + volume size */
29
30 #define MF_OBJECT_EXIST         (1 << 0)
31 #define MF_OBJECT_COPYING       (1 << 1)
32 #define MF_OBJECT_WRITING       (1 << 2)
33 #define MF_OBJECT_DELETING      (1 << 3)
34
35 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
36 extern struct log_ctx lc;
37
38 char *magic_string = "This a magic string. Please hash me";
39 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
40 char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
41
42 //internal mapper states
43 enum mapper_state {
44         ACCEPTED = 0,
45         WRITING = 1,
46         COPYING = 2,
47         DELETING = 3
48 };
49
50 struct map_node {
51         uint32_t flags;
52         uint32_t objectidx;
53         uint32_t objectlen;
54         char object[XSEG_MAX_TARGETLEN + 1];    /* NULL terminated string */
55         struct xq pending;                      /* pending peer_reqs on this object */
56         struct map *map;
57 };
58
59 #define MF_MAP_LOADING          (1 << 0)
60 #define MF_MAP_DESTROYED        (1 << 1)
61 #define MF_MAP_WRITING          (1 << 2)
62 #define MF_MAP_DELETING         (1 << 3)
63
64 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING)
65
66 struct map {
67         uint32_t flags;
68         uint64_t size;
69         uint32_t volumelen;
70         char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
71         xhash_t *objects;       /* obj_index --> map_node */
72         struct xq pending;      /* pending peer_reqs on this map */
73 };
74
75 struct mapperd {
76         xport bportno;          /* blocker that accesses data */
77         xport mbportno;         /* blocker that accesses maps */
78         xhash_t *hashmaps; // hash_function(target) --> struct map
79 };
80
81 struct mapper_io {
82         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
83         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
84         struct map_node *copyup_node;
85         int err;                        /* error flag */
86         uint64_t delobj;
87         enum mapper_state state;
88 };
89
90 static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req);
91 void print_map(struct map *m);
92
93 /*
94  * Helper functions
95  */
96
97 static inline struct mapperd * __get_mapperd(struct peerd *peer)
98 {
99         return (struct mapperd *) peer->priv;
100 }
101
102 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
103 {
104         return (struct mapper_io *) pr->priv;
105 }
106
107 static inline uint64_t calc_map_obj(struct map *map)
108 {
109         uint64_t nr_objs = map->size / block_size;
110         if (map->size % block_size)
111                 nr_objs++;
112         return nr_objs;
113 }
114
115 static uint32_t calc_nr_obj(struct xseg_request *req)
116 {
117         unsigned int r = 1;
118         uint64_t rem_size = req->size;
119         uint64_t obj_offset = req->offset & (block_size -1); //modulo
120         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
121         rem_size -= obj_size;
122         while (rem_size > 0) {
123                 obj_size = (rem_size > block_size) ? block_size : rem_size;
124                 rem_size -= obj_size;
125                 r++;
126         }
127
128         return r;
129 }
130
131 /*
132  * Maps handling functions
133  */
134
135 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
136 {
137         int r;
138         struct map *m = NULL;
139         char buf[XSEG_MAX_TARGETLEN+1];
140         //assert targetlen <= XSEG_MAX_TARGETLEN
141         strncpy(buf, target, targetlen);
142         buf[targetlen] = 0;
143         r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
144         if (r < 0)
145                 return NULL;
146         return m;
147 }
148
149
150 static int insert_map(struct mapperd *mapper, struct map *map)
151 {
152         int r = -1;
153         
154         if (find_map(mapper, map->volume, map->volumelen)){
155                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
156                 goto out;
157         }
158         
159         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
160         if (r == -XHASH_ERESIZE) {
161                 xhashidx shift = xhash_grow_size_shift(map->objects);
162                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
163                 if (!new_hashmap){
164                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
165                                         (unsigned long long) shift);
166                         goto out;
167                 }
168                 mapper->hashmaps = new_hashmap;
169                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
170         }
171 out:
172         return r;
173 }
174
175 static int remove_map(struct mapperd *mapper, struct map *map)
176 {
177         int r = -1;
178         
179         //assert no pending pr on map
180         
181         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
182         if (r == -XHASH_ERESIZE) {
183                 xhashidx shift = xhash_shrink_size_shift(map->objects);
184                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
185                 if (!new_hashmap){
186                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
187                                         (unsigned long long) shift);
188                         goto out;
189                 }
190                 mapper->hashmaps = new_hashmap;
191                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
192         }
193 out:
194         return r;
195 }
196
197 /* async map load */
198 static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint32_t targetlen)
199 {
200         int r;
201         xport p;
202         struct xseg_request *req;
203         struct mapperd *mapper = __get_mapperd(peer);
204         void *dummy;
205         //printf("Loading map\n");
206
207         struct map *m = find_map(mapper, target, targetlen);
208         if (!m) {
209                 m = malloc(sizeof(struct map));
210                 if (!m){
211                         XSEGLOG2(&lc, E, "Cannot allocate map ");
212                         goto out_err;
213                 }
214                 m->size = -1;
215                 strncpy(m->volume, target, targetlen);
216                 m->volume[targetlen] = 0;
217                 m->volumelen = targetlen;
218                 m->flags = MF_MAP_LOADING;
219                 xqindex *qidx = xq_alloc_empty(&m->pending, peer->nr_ops);
220                 if (!qidx) {
221                         XSEGLOG2(&lc, E, "Cannot allocate pending queue for map %s",
222                                         m->volume);
223                         goto out_map;
224                 }
225                 m->objects = xhash_new(3, INTEGER); 
226                 if (!m->objects){
227                         XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
228                                         m->volume);
229                         goto out_q;
230                 }
231                 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME err check
232         } else {
233                 goto map_exists;
234         }
235
236         r = insert_map(mapper, m);
237         if (r < 0)  
238                 goto out_hash;
239         
240         //printf("Loading map: preparing req\n");
241
242         req = xseg_get_request(peer->xseg, peer->portno, mapper->mbportno, X_ALLOC);
243         if (!req){
244                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
245                                 m->volume);
246                 goto out_fail;
247         }
248
249         r = xseg_prep_request(peer->xseg, req, targetlen, block_size);
250         if (r < 0){
251                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
252                                 m->volume);
253                 goto out_put;
254         }
255
256         char *reqtarget = xseg_get_target(peer->xseg, req);
257         if (!reqtarget)
258                 goto out_put;
259         strncpy(reqtarget, target, targetlen);
260         req->op = X_READ;
261         req->size = block_size;
262         req->offset = 0;
263         r = xseg_set_req_data(peer->xseg, req, pr);
264         if (r < 0){
265                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
266                                 m->volume);
267                 goto out_put;
268         }
269         p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
270         if (p == NoPort){ 
271                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
272                                 m->volume);
273                 goto out_unset;
274         }
275         r = xseg_signal(peer->xseg, p);
276         
277         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
278         return 0;
279
280 out_unset:
281         xseg_get_req_data(peer->xseg, req, &dummy);
282 out_put:
283         xseg_put_request(peer->xseg, req, peer->portno);
284
285 out_fail:
286         remove_map(mapper, m);
287         xqindex idx;
288         while((idx = __xq_pop_head(&m->pending)) != Noneidx) {
289                 fail(peer, (struct peer_req *) idx);
290         }
291
292 out_hash:
293         xhash_free(m->objects);
294 out_q:
295         xq_free(&m->pending);
296 out_map:
297         XSEGLOG2(&lc, E, "failed to load map %s", m->volume);
298         free(m);
299 out_err:
300         return -1;
301
302 map_exists:
303         //assert map loading when this is reached
304         if (m->flags & MF_MAP_LOADING) {
305                 XSEGLOG2(&lc, I, "Map %s already exists and loading. "
306                                 "Adding to pending queue", m->volume);
307                 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME errcheck
308         }
309         else {
310                 XSEGLOG2(&lc, I, "Map %s already exists and loaded. Dispatching.", m->volume);
311                 my_dispatch(peer, pr, pr->req);
312         }
313         return 0;
314 }
315
316
317 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
318                                 char *target, uint32_t targetlen, struct map **m)
319 {
320         struct mapperd *mapper = __get_mapperd(peer);
321         int r;
322         *m = find_map(mapper, target, targetlen);
323         if (*m) {
324                 if ((*m)->flags & MF_MAP_NOT_READY) {
325                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
326                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
327                         return MF_PENDING;
328                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
329                 //      return -1;
330                 // 
331                 }else {
332                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
333                         return 0;
334                 }
335         }
336         r = load_map(peer, pr, target, targetlen);
337         if (r < 0)
338                 return -1; //error
339         return MF_PENDING;      
340 }
341
342 /*
343  * Object handling functions
344  */
345
346 struct map_node *find_object(struct map *map, uint64_t obj_index)
347 {
348         struct map_node *mn;
349         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
350         if (r < 0)
351                 return NULL;
352         return mn;
353 }
354
355 static int insert_object(struct map *map, struct map_node *mn)
356 {
357         //FIXME no find object first
358         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
359         if (r == -XHASH_ERESIZE) {
360                 unsigned long shift = xhash_grow_size_shift(map->objects);
361                 map->objects = xhash_resize(map->objects, shift, NULL);
362                 if (!map->objects)
363                         return -1;
364                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
365         }
366         return r;
367 }
368
369
370 /*
371  * map read/write functions 
372  */
373 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
374 {
375         int i;
376         //hexlify sha256 value
377         for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
378                 sprintf(mn->object+2*i, "%02x", buf[i]);
379         }
380
381         mn->object[SHA256_DIGEST_SIZE * 2] = 0;
382         mn->objectlen = SHA256_DIGEST_SIZE * 2;
383         mn->flags = MF_OBJECT_EXIST;
384 }
385
386 static inline void map_to_object(struct map_node *mn, char *buf)
387 {
388         char c = buf[0];
389         mn->flags = 0;
390         if (c)
391                 mn->flags |= MF_OBJECT_EXIST;
392         memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
393         mn->object[XSEG_MAX_TARGETLEN] = 0;
394         mn->objectlen = strlen(mn->object);
395 }
396
397 static inline void object_to_map(char* buf, struct map_node *mn)
398 {
399         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
400         memcpy(buf+1, mn->object, mn->objectlen);
401         memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
402 }
403
404 static inline void mapheader_to_map(struct map *m, char *buf)
405 {
406         uint64_t pos = 0;
407         memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
408         pos += SHA256_DIGEST_SIZE;
409         memcpy(buf + pos, &m->size, sizeof(m->size));
410         pos += sizeof(m->size);
411 }
412
413
414 static int object_write(struct peerd *peer, struct peer_req *pr, 
415                                 struct map *map, struct map_node *mn)
416 {
417         void *dummy;
418         struct mapperd *mapper = __get_mapperd(peer);
419         struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
420                                                         mapper->mbportno, X_ALLOC);
421         if (!req){
422                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
423                                 "(Map: %s [%llu]",
424                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
425                 goto out_err;
426         }
427         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, objectsize_in_map);
428         if (r < 0){
429                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
430                                 "(Map: %s [%llu]",
431                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
432                 goto out_put;
433         }
434         char *target = xseg_get_target(peer->xseg, req);
435         strncpy(target, map->volume, map->volumelen);
436         req->size = objectsize_in_map;
437         req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
438         req->op = X_WRITE;
439         char *data = xseg_get_data(peer->xseg, req);
440         object_to_map(data, mn);
441
442         r = xseg_set_req_data(peer->xseg, req, pr);
443         if (r < 0){
444                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
445                                 "(Map: %s [%llu]",
446                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
447                 goto out_put;
448         }
449         xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
450         if (p == NoPort){
451                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
452                                 "(Map: %s [%llu]",
453                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
454                 goto out_unset;
455         }
456         r = xseg_signal(peer->xseg, p);
457         if (r < 0)
458                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
459
460         XSEGLOG2(&lc, I, "Writing object %s \n\t"
461                         "Map: %s [%llu]",
462                         mn->object, map->volume, (unsigned long long) mn->objectidx);
463
464         return MF_PENDING;
465
466 out_unset:
467         xseg_get_req_data(peer->xseg, req, &dummy);
468 out_put:
469         xseg_put_request(peer->xseg, req, peer->portno);
470 out_err:
471         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
472                         "(Map: %s [%llu]",
473                         mn->object, map->volume, (unsigned long long) mn->objectidx);
474         return -1;
475 }
476
477 static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
478 {
479         void *dummy;
480         struct mapperd *mapper = __get_mapperd(peer);
481         struct map_node *mn;
482         uint64_t i, pos, max_objidx = calc_map_obj(map);
483         struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
484                                                         mapper->mbportno, X_ALLOC);
485         if (!req){
486                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
487                 goto out_err;
488         }
489         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 
490                                         mapheader_size + max_objidx * objectsize_in_map);
491         if (r < 0){
492                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
493                 goto out_put;
494         }
495         char *target = xseg_get_target(peer->xseg, req);
496         strncpy(target, map->volume, req->targetlen);
497         char *data = xseg_get_data(peer->xseg, req);
498         mapheader_to_map(map, data);
499         pos = mapheader_size;
500         req->op = X_WRITE;
501         req->size = req->datalen;
502         req->offset = 0;
503
504         if (map->size % block_size)
505                 max_objidx++;
506         for (i = 0; i < max_objidx; i++) {
507                 mn = find_object(map, i);
508                 if (!mn){
509                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
510                                         (unsigned long long) i, map->volume);
511                         goto out_put;
512                 }
513                 object_to_map(data+pos, mn);
514                 pos += objectsize_in_map;
515         }
516         r = xseg_set_req_data(peer->xseg, req, pr);
517         if (r < 0){
518                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
519                                 map->volume);
520                 goto out_put;
521         }
522         xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
523         if (p == NoPort){
524                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
525                                 map->volume);
526                 goto out_unset;
527         }
528         r = xseg_signal(peer->xseg, p);
529         if (r < 0)
530                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
531
532         map->flags |= MF_MAP_WRITING;
533         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
534         return MF_PENDING;
535
536 out_unset:
537         xseg_get_req_data(peer->xseg, req, &dummy);
538 out_put:
539         xseg_put_request(peer->xseg, req, peer->portno);
540 out_err:
541         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
542         return -1;
543 }
544
545 static int read_map (struct peerd *peer, struct map *map, char *buf)
546 {
547         char nulls[SHA256_DIGEST_SIZE];
548         memset(nulls, 0, SHA256_DIGEST_SIZE);
549
550         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
551         if (r) {
552                 //read error;
553                 return -1;
554         }
555         //type 1, our type, type 0 pithos map
556         int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
557         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
558         uint64_t pos;
559         uint64_t i, nr_objs;
560         struct map_node *map_node;
561         if (type) {
562                 pos = SHA256_DIGEST_SIZE;
563                 map->size = *(uint64_t *) (buf + pos);
564                 pos += sizeof(uint64_t);
565                 nr_objs = map->size / block_size;
566                 if (map->size % block_size)
567                         nr_objs++;
568                 map_node = calloc(nr_objs, sizeof(struct map_node));
569                 if (!map_node)
570                         return -1;
571
572                 for (i = 0; i < nr_objs; i++) {
573                         map_node[i].map = map;
574                         map_node[i].objectidx = i;
575                         xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
576                         (void) qidx;
577                         map_to_object(&map_node[i], buf + pos);
578                         pos += objectsize_in_map;
579                         r = insert_object(map, &map_node[i]); //FIXME error check
580                 }
581         } else {
582                 pos = 0;
583                 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
584                 map_node = calloc(max_nr_objs, sizeof(struct map_node));
585                 if (!map_node)
586                         return -1;
587                 for (i = 0; i < max_nr_objs; i++) {
588                         if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
589                                 break;
590                         map_node[i].objectidx = i;
591                         map_node[i].map = map;
592                         xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
593                         (void) qidx;
594                         pithosmap_to_object(&map_node[i], buf + pos);
595                         pos += SHA256_DIGEST_SIZE; 
596                         r = insert_object(map, &map_node[i]); //FIXME error check
597                 }
598                 map->size = i * block_size; 
599         }
600         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
601         return 0;
602
603         //FIXME cleanup on error
604 }
605
606 /*
607  * copy up functions
608  */
609
610 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
611 {
612         int r = 0;
613         /*
614         if (mn){
615                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
616                 if (r == -XHASH_ERESIZE) {
617                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
618                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
619                         if (!new_hashmap)
620                                 goto out;
621                         mio->copyups_nodes = new_hashmap;
622                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
623                 }
624         }
625         else {
626                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
627                 if (r == -XHASH_ERESIZE) {
628                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
629                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
630                         if (!new_hashmap)
631                                 goto out;
632                         mio->copyups_nodes = new_hashmap;
633                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
634                 }
635         }
636 out:
637         */
638         mio->copyup_node = mn;
639         return r;
640 }
641
642 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
643 {
644         /*
645         struct map_node *mn;
646         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
647         if (r < 0)
648                 return NULL;
649         return mn;
650         */
651         return mio->copyup_node;
652 }
653
654 static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
655 {
656         struct mapperd *mapper = __get_mapperd(peer);
657         struct mapper_io *mio = __get_mapper_io(pr);
658         void *dummy;
659         int r = -1, i;
660         xport p;
661
662         //struct sha256_ctx sha256ctx;
663         uint32_t newtargetlen;
664         char new_target[XSEG_MAX_TARGETLEN + 1]; 
665         unsigned char buf[SHA256_DIGEST_SIZE];  //assert sha256_digest_size(32) <= MAXTARGETLEN 
666         char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
667         strncpy(new_object, mn->object, mn->objectlen);
668         sprintf(new_object + mn->objectlen, "%u", mn->objectidx); //sprintf adds null termination
669         new_object[XSEG_MAX_TARGETLEN + 19] = 0;
670
671         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
672         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
673                 sprintf (new_target + 2*i, "%02x", buf[i]);
674         newtargetlen = SHA256_DIGEST_SIZE  * 2;
675
676
677         struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
678                                                         mapper->bportno, X_ALLOC);
679         if (!req)
680                 goto out_err;
681         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
682                                 sizeof(struct xseg_request_copy));
683         if (r < 0)
684                 goto out_put;
685
686         char *target = xseg_get_target(peer->xseg, req);
687         strncpy(target, new_target, newtargetlen);
688
689         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
690         strncpy(xcopy->target, mn->object, mn->objectlen);
691         xcopy->targetlen = mn->objectlen;
692
693         req->offset = 0;
694         req->size = block_size;
695         req->op = X_COPY;
696         r = xseg_set_req_data(peer->xseg, req, pr);
697         if (r<0)
698                 goto out_put;
699         r = __set_copyup_node(mio, req, mn);
700         p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
701         if (p == NoPort) {
702                 goto out_unset;
703         }
704         xseg_signal(peer->xseg, p);
705         mio->copyups++;
706
707         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
708         return 0;
709
710 out_unset:
711         xseg_get_req_data(peer->xseg, req, &dummy);
712 out_put:
713         xseg_put_request(peer->xseg, req, peer->portno);
714 out_err:
715         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
716         return -1;
717
718 }
719
720 /*
721  * request handling functions
722  */
723
724 static int handle_mapread(struct peerd *peer, struct peer_req *pr, 
725                                 struct xseg_request *req)
726 {
727         int r;
728         xqindex idx;
729         char buf[XSEG_MAX_TARGETLEN];
730         struct mapperd *mapper = __get_mapperd(peer);
731         //assert req->op = X_READ;
732         char *target = xseg_get_target(peer->xseg, req);
733         struct map *map = find_map(mapper, target, req->targetlen);
734         if (!map)
735                 goto out_err;
736         //assert map->flags & MF_MAP_LOADING
737
738         if (req->state & XS_FAILED)
739                 goto out_fail;
740
741         char *data = xseg_get_data(peer->xseg, req);
742         r = read_map(peer, map, data);
743         if (r < 0)
744                 goto out_fail;
745         
746         xseg_put_request(peer->xseg, req, peer->portno);
747         map->flags &= ~MF_MAP_LOADING;
748         XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
749         while((idx = __xq_pop_head(&map->pending)) != Noneidx){
750                 struct peer_req *preq = (struct peer_req *) idx;
751                 my_dispatch(peer, preq, preq->req);
752         }
753         return 0;
754
755 out_fail:
756         XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
757         xseg_put_request(peer->xseg, req, peer->portno);
758         map->flags &= ~MF_MAP_LOADING;
759         while((idx = __xq_pop_head(&map->pending)) != Noneidx){
760                 struct peer_req *preq = (struct peer_req *) idx;
761                 fail(peer, preq);
762         }
763         remove_map(mapper, map);
764         //FIXME not freeing up all objects + object hash
765         free(map);
766         return 0;
767
768 out_err:
769         strncpy(buf, target, req->targetlen);
770         buf[req->targetlen] = 0;
771         XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
772         xseg_put_request(peer->xseg, req, peer->portno);
773         return -1;
774 }
775
776 static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
777                                 struct xseg_request *req)
778 {
779         xqindex idx;
780         char buf[XSEG_MAX_TARGETLEN];
781         struct mapperd *mapper = __get_mapperd(peer);
782         //assert req->op = X_WRITE;
783         char *target = xseg_get_target(peer->xseg, req);
784         struct map *map = find_map(mapper, target, req->targetlen);
785         if (!map) {
786                 fprintf(stderr, "couldn't find map\n");
787                 goto out_err;
788         }
789         //assert map->flags & MF_MAP_WRITING
790
791         if (req->state & XS_FAILED){
792                 fprintf(stderr, "write request failed\n");
793                 goto out_fail;
794         }
795         
796         xseg_put_request(peer->xseg, req, peer->portno);
797         map->flags &= ~MF_MAP_WRITING;
798         XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
799         while((idx = __xq_pop_head(&map->pending)) != Noneidx){
800                 struct peer_req *preq = (struct peer_req *) idx;
801                 my_dispatch(peer, preq, preq->req);
802         }
803         return 0;
804
805
806 out_fail:
807         XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
808         xseg_put_request(peer->xseg, req, peer->portno);
809         map->flags &= ~MF_MAP_WRITING;
810         while((idx = __xq_pop_head(&map->pending)) != Noneidx){
811                 struct peer_req *preq = (struct peer_req *) idx;
812                 fail(peer, preq);
813         }
814         remove_map(mapper, map);
815         //FIXME not freeing up all objects + object hash
816         free(map);
817         return 0;
818
819 out_err:
820         strncpy(buf, target, req->targetlen);
821         buf[req->targetlen] = 0;
822         XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
823         xseg_put_request(peer->xseg, req, peer->portno);
824         return -1;
825 }
826
827 static int handle_clone(struct peerd *peer, struct peer_req *pr, 
828                                 struct xseg_request *req)
829 {
830         struct mapperd *mapper = __get_mapperd(peer);
831         struct mapper_io *mio = __get_mapper_io(pr);
832         (void) mio;
833         int r;
834         char buf[XSEG_MAX_TARGETLEN + 1];
835         char *target;
836
837         if (pr->req->op != X_CLONE) {
838                 //wtf??
839                 XSEGLOG2(&lc, E, "Unknown op %u", req->op);
840                 fail(peer, pr);
841                 return 0;
842         }
843
844         if (req->op == X_WRITE){
845                         //assert state = WRITING;
846                         r = handle_mapwrite(peer, pr ,req);
847                         if (r < 0){
848                                 XSEGLOG2(&lc, E, "handle mapwrite returned error");
849                                 fail(peer, pr);
850                         }
851                         return 0;
852         }
853
854         if (mio->state == WRITING) {
855                 target = xseg_get_target(peer->xseg, pr->req);
856                 strncpy(buf, target, req->targetlen);
857                 buf[req->targetlen] = 0;
858                 XSEGLOG2(&lc, I, "Completing clone request for map %s", buf);
859                 complete(peer, pr);
860                 return 0;
861         }
862
863         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
864         if (!xclone) {
865                 goto out_err;
866         }
867         struct map *map;
868         r = find_or_load_map(peer, pr, xclone->target, xclone->targetlen, &map);
869         if (r < 0){
870                 goto out_err;
871         }
872         else if (r == MF_PENDING)
873                 return 0;
874         
875         if (map->flags & MF_MAP_DESTROYED) {
876                 strncpy(buf, xclone->target, xclone->targetlen);
877                 buf[xclone->targetlen] = 0;
878                 XSEGLOG2(&lc, W, "Map %s destroyed", buf);
879                 target = xseg_get_target(peer->xseg, pr->req);
880                 strncpy(buf, target, req->targetlen);
881                 buf[req->targetlen] = 0;
882                 XSEGLOG2(&lc, W, "Cannont clone %s because base map destroyed", buf);
883                 fail(peer, pr);
884                 return 0;
885         }
886
887         struct map *clonemap = malloc(sizeof(struct map));
888         if (!clonemap) {
889                 goto out_err;
890         }
891         /*
892         FIXME check if clone map exists
893         find_or_load_map(peer, pr, target, req->targetlen, &clonemap)
894         ... (on destroyed what ??
895         if (clonemap) {
896                 target = xseg_get_target(peer->xseg, pr->req);
897                 strncpy(buf, target, req->targetlen);
898                 buf[req->targetlen] = 0;
899                 XSEGLOG2(&lc, W, "Map %s requested for clone exists", buf);
900                 fail(peer, pr);
901                 return 0;
902         }
903         */
904         //alloc and init struct map
905         clonemap->objects = xhash_new(3, INTEGER);
906         if (!clonemap->objects){
907                 goto out_err_clonemap;
908         }
909         xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops);
910         if (!qidx){
911                 goto out_err_objhash;
912         }
913         if (xclone->size < map->size) {
914                 target = xseg_get_target(peer->xseg, pr->req);
915                 strncpy(buf, target, req->targetlen);
916                 buf[req->targetlen] = 0;
917                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
918                                 "\n\t for requested clone %s",
919                                 (unsigned long long) xclone->size,
920                                 (unsigned long long) map->size, buf);
921                 goto out_err_q;
922         }
923         if (xclone->size == -1)
924                 clonemap->size = map->size;
925         else
926                 clonemap->size = xclone->size;
927         clonemap->flags = 0;
928         target = xseg_get_target(peer->xseg, pr->req);
929         strncpy(clonemap->volume, target, pr->req->targetlen);
930         clonemap->volumelen = pr->req->targetlen;
931         clonemap->volume[clonemap->volumelen] = 0; //NULL TERMINATE
932
933         //alloc and init map_nodes
934         unsigned long c = clonemap->size/block_size + 1;
935         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
936         if (!map_nodes){
937                 goto out_err_q;
938         }
939         int i;
940         for (i = 0; i < clonemap->size/block_size + 1; i++) {
941                 struct map_node *mn = find_object(map, i);
942                 if (mn) {
943                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
944                         map_nodes[i].objectlen = mn->objectlen;
945                 } else {
946                         strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2 ?
947                         map_nodes[i].objectlen = strlen(zero_block);
948                 }
949                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
950                 map_nodes[i].flags = 0;
951                 map_nodes[i].objectidx = i;
952                 map_nodes[i].map = clonemap;
953                 xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops);
954                 r = insert_object(clonemap, &map_nodes[i]);
955                 if (r < 0){
956                         goto out_free_all;
957                 }
958         }
959         //insert map
960         r = insert_map(mapper, clonemap);
961         if ( r < 0) {
962                 XSEGLOG2(&lc, E, "Cannot insert map %s", clonemap->volume);
963                 goto out_free_all;
964         }
965         r = map_write(peer, pr, clonemap);
966         if (r < 0){
967                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
968                 goto out_remove;
969         }
970         else if (r == MF_PENDING) {
971                 //maybe move this to map_write
972                 XSEGLOG2(&lc, I, "Writing map %s", clonemap->volume);
973                 __xq_append_tail(&clonemap->pending, (xqindex) pr);
974                 mio->state = WRITING;
975                 return 0;
976         } else {
977                 //unknown state
978                 XSEGLOG2(&lc, I, "Map write for map %s returned unknown value", clonemap->volume);
979                 goto out_remove;
980         }
981         
982         return 0;
983
984 out_remove:
985         remove_map(mapper, clonemap);
986 out_free_all:
987         //FIXME not freeing allocated queues of map_nodes
988         free(map_nodes);
989 out_err_q:
990         xq_free(&clonemap->pending);
991 out_err_objhash:
992         xhash_free(clonemap->objects);
993 out_err_clonemap:
994         free(clonemap);
995 out_err:
996         target = xseg_get_target(peer->xseg, pr->req);
997         strncpy(buf, target, req->targetlen);
998         buf[req->targetlen] = 0;
999         XSEGLOG2(&lc, E, "Clone map for %s failed", buf);
1000         fail(peer, pr);
1001         return -1;
1002 }
1003
1004 static int req2objs(struct peerd *peer, struct peer_req *pr, 
1005                                         struct map *map, int write)
1006 {
1007         char *target = xseg_get_target(peer->xseg, pr->req);
1008         uint32_t nr_objs = calc_nr_obj(pr->req);
1009         uint64_t size = sizeof(struct xseg_reply_map) + 
1010                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1011
1012         /* resize request to fit reply */
1013         char buf[XSEG_MAX_TARGETLEN];
1014         strncpy(buf, target, pr->req->targetlen);
1015         int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1016         if (r < 0) {
1017                 XSEGLOG2(&lc, E, "Cannot resize request");
1018                 return -1;
1019         }
1020         target = xseg_get_target(peer->xseg, pr->req);
1021         strncpy(target, buf, pr->req->targetlen);
1022
1023         /* structure reply */
1024         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1025         reply->cnt = nr_objs;
1026
1027         uint32_t idx = 0;
1028         uint64_t rem_size = pr->req->size;
1029         uint64_t obj_index = pr->req->offset / block_size;
1030         uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo
1031         uint64_t obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1032         struct map_node * mn = find_object(map, obj_index);
1033         if (!mn) {
1034                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1035                 XSEGLOG2(&lc, E, "pr->req->offset: %llu, block_size %u\n", 
1036                                 (unsigned long long) pr->req->offset, 
1037                                 block_size);
1038                 goto out_err;
1039         }
1040         if (write && (mn->flags & MF_OBJECT_NOT_READY)) 
1041                 goto out_object_copying;
1042         if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1043                 //calc new_target, copy up object
1044                 r = copyup_object(peer, mn, pr);
1045                 if (r < 0) {
1046                         XSEGLOG2(&lc, E, "Error in copy up object");
1047                         goto out_err_copy;
1048                 }
1049                 mn->flags |= MF_OBJECT_COPYING;
1050                 goto out_object_copying;
1051         }
1052
1053         strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1054         reply->segs[idx].targetlen = mn->objectlen;
1055         reply->segs[idx].target[mn->objectlen] = 0;
1056         reply->segs[idx].offset = obj_offset;
1057         reply->segs[idx].size = obj_size;
1058         rem_size -= obj_size;
1059         while (rem_size > 0) {
1060                 idx++;
1061                 obj_index++;
1062                 obj_offset = 0;
1063                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
1064                 rem_size -= obj_size;
1065                 mn = find_object(map, obj_index);
1066                 if (!mn) {
1067                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1068                         goto out_err;
1069                 }
1070                 if (write && (mn->flags & MF_OBJECT_NOT_READY)) 
1071                         goto out_object_copying;
1072                 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1073                         //calc new_target, copy up object
1074                         r = copyup_object(peer, mn, pr);
1075                         if (r < 0) {
1076                                 XSEGLOG2(&lc, E, "Error in copy up object");
1077                                 goto out_err_copy;
1078                         }
1079                         mn->flags |= MF_OBJECT_COPYING;
1080                         goto out_object_copying;
1081                 }
1082                 strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1083                 reply->segs[idx].targetlen = mn->objectlen;
1084                 reply->segs[idx].target[mn->objectlen] = 0;
1085                 reply->segs[idx].offset = obj_offset;
1086                 reply->segs[idx].size = obj_size;
1087         }
1088
1089         return 0;
1090
1091 out_object_copying:
1092         //printf("r2o mn: %lx\n", mn);
1093         //printf("volume %s pending on %s\n", map->volume, mn->object);
1094         //assert !write
1095         if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx)
1096                 XSEGLOG2(&lc, E, "Cannot append pr to tail");
1097         XSEGLOG2(&lc, I, "object %s is pending \n\t idx:%llu of map %s",
1098                         mn->object, (unsigned long long) mn->objectidx, map->volume);
1099         return MF_PENDING;
1100
1101 out_err_copy:
1102 out_err:
1103         return -1;
1104 }
1105
1106 static int handle_mapr(struct peerd *peer, struct peer_req *pr, 
1107                                 struct xseg_request *req)
1108 {
1109         struct mapperd *mapper = __get_mapperd(peer);
1110         struct mapper_io *mio = __get_mapper_io(pr);
1111         (void)mapper;
1112         (void)mio;
1113         //get_map
1114         char *target = xseg_get_target(peer->xseg, pr->req);
1115         struct map *map;
1116         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1117         if (r < 0) {
1118                 fail(peer, pr);
1119                 return -1;
1120         }
1121         else if (r == MF_PENDING)
1122                 return 0;
1123         
1124         if (map->flags & MF_MAP_DESTROYED) {
1125                 fail(peer, pr);
1126                 return 0;
1127         }
1128         
1129         //get_object
1130         r = req2objs(peer, pr, map, 0);
1131         if  (r < 0){
1132                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1133                                 map->volume, 
1134                                 (unsigned long long) pr->req->offset, 
1135                                 (unsigned long long) (pr->req->offset + pr->req->size));
1136                 fail(peer, pr);
1137         }
1138         else if (r == 0)
1139                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1140                                 map->volume, 
1141                                 (unsigned long long) pr->req->offset, 
1142                                 (unsigned long long) (pr->req->offset + pr->req->size));
1143                 complete(peer, pr);
1144
1145         return 0;
1146
1147
1148 }
1149
1150 static int handle_copyup(struct peerd *peer, struct peer_req *pr,
1151                                 struct xseg_request *req)
1152 {
1153         struct mapperd *mapper = __get_mapperd(peer);
1154         (void) mapper;
1155         struct mapper_io *mio = __get_mapper_io(pr);
1156         int r = 0;
1157         xqindex idx;
1158         struct map_node *mn = __get_copyup_node(mio, req);
1159         if (!mn)
1160                 goto out_err;
1161
1162         mn->flags &= ~MF_OBJECT_COPYING;
1163         if (req->state & XS_FAILED && !(req->state & XS_SERVED)){
1164                 XSEGLOG2(&lc, E, "Copy up of object %s failed", mn->object);
1165                 goto out_fail;
1166         }
1167         struct map *map = mn->map;
1168         if (!map){
1169                 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1170                 goto out_fail;
1171         }
1172         
1173         /* construct a tmp map_node for writing purposes */
1174         char *target = xseg_get_target(peer->xseg, req);
1175         struct map_node newmn = *mn;
1176         newmn.flags = MF_OBJECT_EXIST;
1177         strncpy(newmn.object, target, req->targetlen);
1178         newmn.object[req->targetlen] = 0;
1179         newmn.objectlen = req->targetlen;
1180         newmn.objectidx = mn->objectidx; 
1181         r = object_write(peer, pr, map, &newmn);
1182         if (r != MF_PENDING){
1183                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1184                                 "\n\t of map %s [%llu]",
1185                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
1186                 goto out_fail;
1187         }
1188         mn->flags |= MF_OBJECT_WRITING;
1189         xseg_put_request(peer->xseg, req, peer->portno);
1190         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1191         return 0;
1192
1193 out_fail:
1194         xseg_put_request(peer->xseg, req, peer->portno);
1195         __set_copyup_node(mio, req, NULL);
1196         while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1197                 struct peer_req * preq = (struct peer_req *) idx;
1198                 fail(peer, preq);
1199         }
1200
1201 out_err:
1202         XSEGLOG2(&lc, E, "Cannot get map node");
1203         return -1;
1204 }
1205
1206 static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
1207                                 struct xseg_request *req)
1208 {
1209         xqindex idx;
1210         struct mapperd *mapper = __get_mapperd(peer);
1211         struct mapper_io *mio = __get_mapper_io(pr);
1212         //assert req->op = X_WRITE;
1213         char *target = xseg_get_target(peer->xseg, req);
1214         (void)target;
1215         (void)mapper;
1216         //printf("handle object write replyi\n");
1217         struct map_node *mn = __get_copyup_node(mio, req);
1218         if (!mn)
1219                 goto out_err;
1220         
1221         __set_copyup_node(mio, req, NULL);
1222         
1223         //assert mn->flags & MF_OBJECT_WRITING
1224         mn->flags &= ~MF_OBJECT_WRITING;
1225         if (req->state & XS_FAILED)
1226                 goto out_fail;
1227
1228         struct map_node tmp;
1229         char *data = xseg_get_data(peer->xseg, req);
1230         map_to_object(&tmp, data);
1231         mn->flags |= MF_OBJECT_EXIST;
1232         if (mn->flags != MF_OBJECT_EXIST){
1233                 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1234                 return *(int *) 0;
1235         }
1236         //assert mn->flags & MF_OBJECT_EXIST
1237         strncpy(mn->object, tmp.object, tmp.objectlen);
1238         mn->object[tmp.objectlen] = 0;
1239         mn->objectlen = tmp.objectlen;
1240         xseg_put_request(peer->xseg, req, peer->portno);
1241
1242         XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1243         while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1244                 struct peer_req * preq = (struct peer_req *) idx;
1245                 my_dispatch(peer, preq, preq->req);
1246         }
1247         return 0;
1248
1249 out_fail:
1250         XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
1251         xseg_put_request(peer->xseg, req, peer->portno);
1252         while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1253                 struct peer_req *preq = (struct peer_req *) idx;
1254                 fail(peer, preq);
1255         }
1256         return 0;
1257
1258 out_err:
1259         XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
1260         xseg_put_request(peer->xseg, req, peer->portno);
1261         return -1;
1262 }
1263
1264 static int handle_mapw(struct peerd *peer, struct peer_req *pr, 
1265                                 struct xseg_request *req)
1266 {
1267         struct mapperd *mapper = __get_mapperd(peer);
1268         struct mapper_io *mio = __get_mapper_io(pr);
1269         (void) mapper;
1270         (void) mio;
1271         /* handle copy up replies separately */
1272         if (req->op == X_COPY){
1273                 if (handle_copyup(peer, pr, req) < 0){
1274                         XSEGLOG2(&lc, E, "Handle copy up returned error");
1275                         fail(peer, pr);
1276                         return -1;
1277                 } else {
1278                         return 0;
1279                 }
1280         }
1281         else if(req->op == X_WRITE){
1282                 /* handle replies of object write operations */
1283                 if (handle_objectwrite(peer, pr, req) < 0) {
1284                         XSEGLOG2(&lc, E, "Handle object write returned error");
1285                         fail(peer, pr);
1286                         return -1;
1287                 } else {
1288                         return 0;
1289                 }
1290         }
1291
1292         char *target = xseg_get_target(peer->xseg, pr->req);
1293         struct map *map;
1294         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1295         if (r < 0) {
1296                 fail(peer, pr);
1297                 return -1;
1298         }
1299         else if (r == MF_PENDING)
1300                 return 0;
1301         
1302         if (map->flags & MF_MAP_DESTROYED) {
1303                 fail(peer, pr);
1304                 return 0;
1305         }
1306
1307         r = req2objs(peer, pr, map, 1);
1308         if (r < 0){
1309                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1310                                 map->volume, 
1311                                 (unsigned long long) pr->req->offset, 
1312                                 (unsigned long long) (pr->req->offset + pr->req->size));
1313                 fail(peer, pr);
1314         }
1315         if (r == 0){
1316                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1317                                 map->volume, 
1318                                 (unsigned long long) pr->req->offset, 
1319                                 (unsigned long long) (pr->req->offset + pr->req->size));
1320                 complete(peer, pr);
1321         }
1322         //else copyup pending, wait for pr restart
1323
1324         return 0;
1325 }
1326
1327 static int handle_snap(struct peerd *peer, struct peer_req *pr, 
1328                                 struct xseg_request *req)
1329 {
1330         fail(peer, pr);
1331         return 0;
1332 }
1333
1334 static int handle_info(struct peerd *peer, struct peer_req *pr, 
1335                                 struct xseg_request *req)
1336 {
1337         struct mapperd *mapper = __get_mapperd(peer);
1338         struct mapper_io *mio = __get_mapper_io(pr);
1339         (void) mapper;
1340         (void) mio;
1341         char *target = xseg_get_target(peer->xseg, pr->req);
1342         if (!target) {
1343                 fail(peer, pr);
1344                 return 0;
1345         }
1346         //printf("Handle info\n");
1347         struct map *map;
1348         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1349         if (r < 0) {
1350                 fail(peer, pr);
1351                 return -1;
1352         }
1353         else if (r == MF_PENDING)
1354                 return 0;
1355         if (map->flags & MF_MAP_DESTROYED) {
1356                 fail(peer, pr);
1357                 return 0;
1358         }
1359         
1360         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1361         xinfo->size = map->size;
1362         complete(peer, pr);
1363
1364         return 0;
1365 }
1366
1367 static int delete_object(struct peerd *peer, struct peer_req *pr,
1368                                 struct map_node *mn)
1369 {
1370         void *dummy;
1371         struct mapperd *mapper = __get_mapperd(peer);
1372         struct mapper_io *mio = __get_mapper_io(pr);
1373
1374         if (xq_count(&mn->pending) != 0) {
1375                 mio->delobj = mn->objectidx;
1376                 __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1377                 XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1378                                 mn->object);
1379                 return MF_PENDING;
1380         }
1381
1382         struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
1383                                                         mapper->bportno, X_ALLOC);
1384         if (!req)
1385                 goto out_err;
1386         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1387         if (r < 0)
1388                 goto out_put;
1389         char *target = xseg_get_target(peer->xseg, req);
1390         strncpy(target, mn->object, req->targetlen);
1391         req->op = X_DELETE;
1392         req->size = req->datalen;
1393         req->offset = 0;
1394
1395         r = xseg_set_req_data(peer->xseg, req, pr);
1396         if (r < 0)
1397                 goto out_put;
1398         __set_copyup_node(mio, req, mn);
1399         xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
1400         if (p == NoPort)
1401                 goto out_unset;
1402         r = xseg_signal(peer->xseg, p);
1403         mn->flags |= MF_OBJECT_DELETING;
1404         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1405         return MF_PENDING;
1406
1407 out_unset:
1408         xseg_get_req_data(peer->xseg, req, &dummy);
1409 out_put:
1410         xseg_put_request(peer->xseg, req, peer->portno);
1411 out_err:
1412         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1413         return -1;
1414 }
1415 static int handle_object_delete(struct peerd *peer, struct peer_req *pr, 
1416                                  struct map_node *mn, int err)
1417 {
1418         struct mapperd *mapper = __get_mapperd(peer);
1419         struct mapper_io *mio = __get_mapper_io(pr);
1420         uint64_t idx;
1421         struct map *map = mn->map;
1422         int r;
1423         (void) mio;
1424         //if object deletion failed, map deletion must continue
1425         //and report OK, since map block has been deleted succesfully
1426         //so, no check for err
1427
1428         //assert object flags OK
1429         //free map_node_resources
1430         map->flags &= ~MF_OBJECT_DELETING;
1431         xq_free(&mn->pending);
1432         //find next object
1433         idx = mn->objectidx;
1434         //remove_object(map, idx);
1435         idx++;
1436         mn = find_object(map, idx);
1437         while (!mn && idx < calc_map_obj(map)) {
1438                 idx++;
1439                 mn = find_object(map, idx);
1440         }
1441         if (mn) {
1442                 //delete next object or complete;
1443                 r = delete_object(peer, pr, mn);
1444                 if (r < 0) {
1445                         XSEGLOG2(&lc, E, "Object %s delete object return error"
1446                                          "\n\t Map: %s [%llu]", 
1447                                          mn->object, mn->map->volume, 
1448                                          (unsigned long long) mn->objectidx);
1449                         goto del_completed;
1450                 }
1451                 XSEGLOG2(&lc, I, "Handle object delete OK");
1452         } else {
1453 del_completed:
1454                 //assert map flags OK
1455                 map->flags &= ~MF_MAP_DELETING;
1456                 map->flags |= MF_MAP_DESTROYED;
1457                 XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1458                 //make all pending requests on map to fail
1459                 while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1460                         struct peer_req * preq = (struct peer_req *) idx;
1461                         my_dispatch(peer, preq, preq->req);
1462                 }
1463                 //free map resources;
1464                 remove_map(mapper, map);
1465                 mn = find_object(map, 0);
1466                 free(mn);
1467                 xq_free(&map->pending);
1468                 free(map);
1469         }
1470         return 0;
1471 }
1472
1473 static int delete_map(struct peerd *peer, struct peer_req *pr,
1474                         struct map *map)
1475 {
1476         void *dummy;
1477         struct mapperd *mapper = __get_mapperd(peer);
1478         struct mapper_io *mio = __get_mapper_io(pr);
1479         struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
1480                                                         mapper->mbportno, X_ALLOC);
1481         if (!req)
1482                 goto out_err;
1483         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1484         if (r < 0)
1485                 goto out_put;
1486         char *target = xseg_get_target(peer->xseg, req);
1487         strncpy(target, map->volume, req->targetlen);
1488         req->op = X_DELETE;
1489         req->size = req->datalen;
1490         req->offset = 0;
1491
1492         r = xseg_set_req_data(peer->xseg, req, pr);
1493         if (r < 0)
1494                 goto out_put;
1495         __set_copyup_node(mio, req, NULL);
1496         xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
1497         if (p == NoPort)
1498                 goto out_unset;
1499         r = xseg_signal(peer->xseg, p);
1500         map->flags |= MF_MAP_DELETING;
1501         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1502         return MF_PENDING;
1503
1504 out_unset:
1505         xseg_get_req_data(peer->xseg, req, &dummy);
1506 out_put:
1507         xseg_put_request(peer->xseg, req, peer->portno);
1508 out_err:
1509         XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
1510         return -1;
1511 }
1512
1513 static int handle_map_delete(struct peerd *peer, struct peer_req *pr, 
1514                                 struct map *map, int err)
1515 {
1516         struct mapperd *mapper = __get_mapperd(peer);
1517         struct mapper_io *mio = __get_mapper_io(pr);
1518         xqindex idx;
1519         int r;
1520         (void) mio;
1521         map->flags &= ~MF_MAP_DELETING;
1522         if (err) {
1523                 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1524                 //dispatch all pending
1525                 while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1526                         struct peer_req * preq = (struct peer_req *) idx;
1527                         my_dispatch(peer, preq, preq->req);
1528                 }
1529         } else {
1530                 map->flags |= MF_MAP_DESTROYED;
1531                 //delete all objects
1532                 XSEGLOG2(&lc, E, "Map %s map block deleted. Deleting objects", map->volume);
1533                 struct map_node *mn = find_object(map, 0);
1534                 if (!mn) {
1535                         XSEGLOG2(&lc, E, "Map %s has no object 0", map->volume);
1536                         //this should never happen
1537                         //make all pending requests on map to fail
1538                         while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1539                                 struct peer_req * preq = (struct peer_req *) idx;
1540                                 my_dispatch(peer, preq, preq->req);
1541                         }
1542                         //free map resources;
1543                         remove_map(mapper, map);
1544                         xq_free(&map->pending);
1545                         free(map);
1546                         return 0;
1547                 }
1548                 r = delete_object(peer, pr, mn);
1549                 if (r < 0) {
1550                         XSEGLOG2(&lc, E, "Deleting first object of map %s returned error"
1551                                         "\n\t Dispatching pending requests",
1552                                         map->volume);
1553                         //dispatch all pending
1554                         while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1555                                 struct peer_req * preq = (struct peer_req *) idx;
1556                                 my_dispatch(peer, preq, preq->req);
1557                         }
1558                 }
1559         }
1560         return 0;
1561 }
1562
1563 static int handle_delete(struct peerd *peer, struct peer_req *pr, 
1564                                 struct xseg_request *req)
1565 {
1566         struct mapperd *mapper = __get_mapperd(peer);
1567         struct mapper_io *mio = __get_mapper_io(pr);
1568         struct map_node *mn;
1569         struct map *map;
1570         int err = 0;
1571         if (req->state & XS_FAILED && !(req->state &XS_SERVED)) 
1572                 err = 1;
1573         
1574         mn = __get_copyup_node(mio, req);
1575         __set_copyup_node(mio, req, NULL);
1576         char *target = xseg_get_target(peer->xseg, req);
1577         if (!mn) {
1578                 //map block delete
1579                 map = find_map(mapper, target, req->targetlen);
1580                 if (!map) {
1581                         xseg_put_request(peer->xseg, req, peer->portno);
1582                         return -1;
1583                 }
1584                 handle_map_delete(peer, pr, map, err);
1585         } else {
1586                 //object delete
1587                 map = mn->map;
1588                 if (!map) {
1589                         xseg_put_request(peer->xseg, req, peer->portno);
1590                         return -1;
1591                 }
1592                 handle_object_delete(peer, pr, mn, err);
1593         }
1594         xseg_put_request(peer->xseg, req, peer->portno);
1595         return 0;
1596 }
1597
1598 static int handle_destroy(struct peerd *peer, struct peer_req *pr, 
1599                                 struct xseg_request *req)
1600 {
1601         struct mapperd *mapper = __get_mapperd(peer);
1602         struct mapper_io *mio = __get_mapper_io(pr);
1603         (void) mapper;
1604         int r;
1605
1606         if (pr->req != req && req->op == X_DELETE) {
1607                 //assert mio->state == DELETING
1608                 r = handle_delete(peer, pr, req);
1609                 if (r < 0) {
1610                         XSEGLOG2(&lc, E, "Handle delete returned error");
1611                         fail(peer, pr);
1612                         return -1;
1613                 } else {
1614                         return 0;
1615                 }
1616         }
1617
1618         struct map *map;
1619         char *target = xseg_get_target(peer->xseg, pr->req);
1620         r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1621         if (r < 0) {
1622                 fail(peer, pr);
1623                 return -1;
1624         }
1625         else if (r == MF_PENDING)
1626                 return 0;
1627         if (map->flags & MF_MAP_DESTROYED) {
1628                 if (mio->state == DELETING){
1629                         XSEGLOG2(&lc, I, "Map %s destroyed", map->volume);
1630                         complete(peer, pr);
1631                 }
1632                 else{
1633                         XSEGLOG2(&lc, I, "Map %s already destroyed", map->volume);
1634                         fprintf(stderr, "map destroyed\n");
1635                         fail(peer, pr);
1636                 }
1637                 return 0;
1638         }
1639         if (mio->state == DELETING) {
1640                 //continue deleting map objects;
1641                 struct map_node *mn = find_object(map, mio->delobj);
1642                 if (!mn) {
1643                         complete(peer, pr);
1644                         return 0;
1645                 }
1646                 r = delete_object(peer, pr, mn);
1647                 if (r < 0) {
1648                         complete(peer, pr);
1649                 }
1650                 return 0;
1651         }
1652         //delete map block
1653         r = delete_map(peer, pr, map);
1654         if (r < 0) {
1655                 XSEGLOG2(&lc, E, "Map delete for map %s returned error", map->volume);
1656                 fail(peer, pr);
1657                 return -1;
1658         } else if (r == MF_PENDING) {
1659                 XSEGLOG2(&lc, I, "Map %s delete pending", map->volume);
1660                 __xq_append_tail(&map->pending, (xqindex) pr);
1661                 mio->state = DELETING;
1662                 return 0;
1663         }
1664         //unreachable
1665         XSEGLOG2(&lc, E, "Destroy unreachable");
1666         fail(peer, pr);
1667         return 0;
1668 }
1669
1670 static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1671 {
1672         struct mapperd *mapper = __get_mapperd(peer);
1673         (void) mapper;
1674         struct mapper_io *mio = __get_mapper_io(pr);
1675         (void) mio;
1676
1677         if (req->op == X_READ) {
1678                 /* catch map reads requests here */
1679                 handle_mapread(peer, pr, req);
1680                 return 0;
1681         }
1682
1683         switch (pr->req->op) {
1684                 /* primary xseg operations of mapper */
1685                 case X_CLONE: handle_clone(peer, pr, req); break;
1686                 case X_MAPR: handle_mapr(peer, pr, req); break;
1687                 case X_MAPW: handle_mapw(peer, pr, req); break;
1688 //              case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1689                 case X_INFO: handle_info(peer, pr, req); break;
1690                 case X_DELETE: handle_destroy(peer, pr, req); break;
1691                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1692         }
1693         return 0;
1694 }
1695
1696 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1697 {
1698         struct mapperd *mapper = __get_mapperd(peer);
1699         (void) mapper;
1700         struct mapper_io *mio = __get_mapper_io(pr);
1701         (void) mio;
1702
1703         if (pr->req == req)
1704                 mio->state = ACCEPTED;
1705         my_dispatch(peer, pr ,req);
1706         return 0;
1707 }
1708
1709 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1710 {
1711         int i;
1712         unsigned char buf[SHA256_DIGEST_SIZE];
1713         char *zero;
1714
1715         gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1716
1717         /* Version check should be the very first call because it
1718           makes sure that important subsystems are intialized. */
1719         gcry_check_version (NULL);
1720      
1721         /* Disable secure memory.  */
1722         gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1723      
1724         /* Tell Libgcrypt that initialization has completed. */
1725         gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1726
1727         /* calculate out magic sha hash value */
1728         gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1729
1730         /* calculate zero block */
1731         //FIXME check hash value
1732         zero = malloc(block_size);
1733         memset(zero, 0, block_size);
1734         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1735         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1736                 sprintf(zero_block + 2*i, "%02x", buf[i]);
1737         printf("%s \n", zero_block);
1738         free(zero);
1739
1740         //FIXME error checks
1741         struct mapperd *mapper = malloc(sizeof(struct mapperd));
1742         mapper->hashmaps = xhash_new(3, STRING);
1743         peer->priv = mapper;
1744         
1745         for (i = 0; i < peer->nr_ops; i++) {
1746                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1747                 mio->copyups_nodes = xhash_new(3, INTEGER);
1748                 mio->copyups = 0;
1749                 mio->err = 0;
1750                 peer->peer_reqs[i].priv = mio;
1751         }
1752
1753         for (i = 0; i < argc; i++) {
1754                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1755                         mapper->bportno = atoi(argv[i+1]);
1756                         i += 1;
1757                         continue;
1758                 }
1759                 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1760                         mapper->mbportno = atoi(argv[i+1]);
1761                         i += 1;
1762                         continue;
1763                 }
1764                 /* enforce only one thread */
1765                 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1766                         int t = atoi(argv[i+1]);
1767                         if (t != 1) {
1768                                 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1769                                 return -1;
1770                         }
1771                         i += 1;
1772                         continue;
1773                 }
1774         }
1775
1776 //      test_map(peer);
1777
1778         return 0;
1779 }
1780
1781 void print_obj(struct map_node *mn)
1782 {
1783         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
1784                         (unsigned long long) mn->objectidx, mn->object, 
1785                         (unsigned int) mn->objectlen, 
1786                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
1787 }
1788
1789 void print_map(struct map *m)
1790 {
1791         uint64_t nr_objs = m->size/block_size;
1792         if (m->size % block_size)
1793                 nr_objs++;
1794         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n", 
1795                         m->volume, m->volumelen, 
1796                         (unsigned long long) m->size, 
1797                         (unsigned long long) nr_objs);
1798         uint64_t i;
1799         struct map_node *mn;
1800         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
1801                 return;
1802         for (i = 0; i < nr_objs; i++) {
1803                 mn = find_object(m, i);
1804                 if (!mn){
1805                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
1806                         continue;
1807                 }
1808                 print_obj(mn);
1809         }
1810 }
1811
1812 /*
1813 void test_map(struct peerd *peer)
1814 {
1815         int i,j, ret;
1816         //struct sha256_ctx sha256ctx;
1817         unsigned char buf[SHA256_DIGEST_SIZE];
1818         char buf_new[XSEG_MAX_TARGETLEN + 20];
1819         struct map *m = malloc(sizeof(struct map));
1820         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
1821         m->volume[XSEG_MAX_TARGETLEN] = 0;
1822         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
1823         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
1824         m->volumelen = XSEG_MAX_TARGETLEN;
1825         m->size = 100*block_size;
1826         m->objects = xhash_new(3, INTEGER);
1827         struct map_node *map_node = calloc(100, sizeof(struct map_node));
1828         for (i = 0; i < 100; i++) {
1829                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
1830                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
1831                 
1832                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
1833                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
1834                 }
1835                 map_node[i].objectidx = i;
1836                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
1837                 map_node[i].flags = MF_OBJECT_EXIST;
1838                 ret = insert_object(m, &map_node[i]);
1839         }
1840
1841         char *data = malloc(block_size);
1842         mapheader_to_map(m, data);
1843         uint64_t pos = mapheader_size;
1844
1845         for (i = 0; i < 100; i++) {
1846                 map_node = find_object(m, i);
1847                 if (!map_node){
1848                         printf("no object node %d \n", i);
1849                         exit(1);
1850                 }
1851                 object_to_map(data+pos, map_node);
1852                 pos += objectsize_in_map;
1853         }
1854 //      print_map(m);
1855
1856         struct map *m2 = malloc(sizeof(struct map));
1857         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
1858         m->volume[XSEG_MAX_TARGETLEN] = 0;
1859         m->volumelen = XSEG_MAX_TARGETLEN;
1860
1861         m2->objects = xhash_new(3, INTEGER);
1862         ret = read_map(peer, m2, data);
1863 //      print_map(m2);
1864
1865         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
1866         ssize_t r, sum = 0;
1867         while (sum < block_size) {
1868                 r = write(fd, data + sum, block_size -sum);
1869                 if (r < 0){
1870                         perror("write");
1871                         printf("write error\n");
1872                         exit(1);
1873                 } 
1874                 sum += r;
1875         }
1876         close(fd);
1877         map_node = find_object(m, 0);
1878         free(map_node);
1879         free(m);
1880 }
1881 */