added speer skeletor
[archipelago] / xseg / peers / user / mt-mapperd.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <pthread.h>
5 #include <xseg/xseg.h>
6 #include <mpeer.h>
7 #include <time.h>
8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <gcrypt.h>
14 #include <errno.h>
15 #include <sched.h>
16 #include <sys/syscall.h>
17
18 GCRY_THREAD_OPTION_PTHREAD_IMPL;
19
20 #define MF_PENDING 1
21
22 #define SHA256_DIGEST_SIZE 32
23 /* hex representation of sha256 value takes up double the sha256 size */
24 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
25
26 #define block_size (1<<22) //FIXME this should be defined here?
27 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
28 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value  + volume size */
29
30 #define MF_OBJECT_EXIST         (1 << 0)
31 #define MF_OBJECT_COPYING       (1 << 1)
32 #define MF_OBJECT_WRITING       (1 << 2)
33 #define MF_OBJECT_DELETING      (1 << 3)
34
35 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
36 extern struct log_ctx lc;
37
38 char *magic_string = "This a magic string. Please hash me";
39 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
40 char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
41
42 //internal mapper states
43 enum mapper_state {
44         ACCEPTED = 0,
45         WRITING = 1,
46         COPYING = 2,
47         DELETING = 3,
48         DROPPING_CACHE = 4
49 };
50
51 struct map_node {
52         uint32_t flags;
53         uint32_t objectidx;
54         uint32_t objectlen;
55         char object[XSEG_MAX_TARGETLEN + 1];    /* NULL terminated string */
56         struct xq pending;                      /* pending peer_reqs on this object */
57         struct map *map;
58 };
59
60 #define MF_MAP_LOADING          (1 << 0)
61 #define MF_MAP_DESTROYED        (1 << 1)
62 #define MF_MAP_WRITING          (1 << 2)
63 #define MF_MAP_DELETING         (1 << 3)
64 #define MF_MAP_DROPPING_CACHE   (1 << 4)
65
66 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
67                                         MF_MAP_DROPPING_CACHE)
68
69 struct map {
70         uint32_t flags;
71         uint64_t size;
72         uint32_t volumelen;
73         char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
74         xhash_t *objects;       /* obj_index --> map_node */
75         struct xq pending;      /* pending peer_reqs on this map */
76 };
77
78 struct mapperd {
79         xport bportno;          /* blocker that accesses data */
80         xport mbportno;         /* blocker that accesses maps */
81         xhash_t *hashmaps; // hash_function(target) --> struct map
82 };
83
84 struct mapper_io {
85         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
86         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
87         struct map_node *copyup_node;
88         int err;                        /* error flag */
89         uint64_t delobj;
90         uint64_t dcobj;
91         enum mapper_state state;
92 };
93
94 static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req);
95 void print_map(struct map *m);
96
97 /*
98  * Helper functions
99  */
100
101 static inline struct mapperd * __get_mapperd(struct peerd *peer)
102 {
103         return (struct mapperd *) peer->priv;
104 }
105
106 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
107 {
108         return (struct mapper_io *) pr->priv;
109 }
110
111 static inline uint64_t calc_map_obj(struct map *map)
112 {
113         uint64_t nr_objs = map->size / block_size;
114         if (map->size % block_size)
115                 nr_objs++;
116         return nr_objs;
117 }
118
119 static uint32_t calc_nr_obj(struct xseg_request *req)
120 {
121         unsigned int r = 1;
122         uint64_t rem_size = req->size;
123         uint64_t obj_offset = req->offset & (block_size -1); //modulo
124         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
125         rem_size -= obj_size;
126         while (rem_size > 0) {
127                 obj_size = (rem_size > block_size) ? block_size : rem_size;
128                 rem_size -= obj_size;
129                 r++;
130         }
131
132         return r;
133 }
134
135 /*
136  * Maps handling functions
137  */
138
139 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
140 {
141         int r;
142         struct map *m = NULL;
143         char buf[XSEG_MAX_TARGETLEN+1];
144         //assert targetlen <= XSEG_MAX_TARGETLEN
145         strncpy(buf, target, targetlen);
146         buf[targetlen] = 0;
147         XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
148         r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
149         if (r < 0)
150                 return NULL;
151         return m;
152 }
153
154
155 static int insert_map(struct mapperd *mapper, struct map *map)
156 {
157         int r = -1;
158         
159         if (find_map(mapper, map->volume, map->volumelen)){
160                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
161                 goto out;
162         }
163
164         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
165                         map->volume, strlen(map->volume), (unsigned long) map);
166         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
167         while (r == -XHASH_ERESIZE) {
168                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
169                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
170                 if (!new_hashmap){
171                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
172                                         (unsigned long long) shift);
173                         goto out;
174                 }
175                 mapper->hashmaps = new_hashmap;
176                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
177         }
178 out:
179         return r;
180 }
181
182 static int remove_map(struct mapperd *mapper, struct map *map)
183 {
184         int r = -1;
185         
186         //assert no pending pr on map
187         
188         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
189         while (r == -XHASH_ERESIZE) {
190                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
191                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
192                 if (!new_hashmap){
193                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
194                                         (unsigned long long) shift);
195                         goto out;
196                 }
197                 mapper->hashmaps = new_hashmap;
198                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
199         }
200 out:
201         return r;
202 }
203
204 /* async map load */
205 static int load_map(struct peerd *peer, struct peer_req *pr, char *target, 
206                         uint32_t targetlen)
207 {
208         int r;
209         xport p;
210         struct xseg_request *req;
211         struct mapperd *mapper = __get_mapperd(peer);
212         void *dummy;
213
214         struct map *m = find_map(mapper, target, targetlen);
215         if (!m) {
216                 m = malloc(sizeof(struct map));
217                 if (!m){
218                         XSEGLOG2(&lc, E, "Cannot allocate map ");
219                         goto out_err;
220                 }
221                 m->size = -1;
222                 strncpy(m->volume, target, targetlen);
223                 m->volume[targetlen] = 0;
224                 m->volumelen = targetlen;
225                 m->flags = MF_MAP_LOADING;
226                 xqindex *qidx = xq_alloc_empty(&m->pending, peer->nr_ops);
227                 if (!qidx) {
228                         XSEGLOG2(&lc, E, "Cannot allocate pending queue for map %s",
229                                         m->volume);
230                         goto out_map;
231                 }
232                 m->objects = xhash_new(3, INTEGER); 
233                 if (!m->objects){
234                         XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
235                                         m->volume);
236                         goto out_q;
237                 }
238                 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME err check
239         } else {
240                 goto map_exists;
241         }
242
243         r = insert_map(mapper, m);
244         if (r < 0)  
245                 goto out_hash;
246         
247
248         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
249         if (!req){
250                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
251                                 m->volume);
252                 goto out_fail;
253         }
254
255         r = xseg_prep_request(peer->xseg, req, targetlen, block_size);
256         if (r < 0){
257                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
258                                 m->volume);
259                 goto out_put;
260         }
261
262         char *reqtarget = xseg_get_target(peer->xseg, req);
263         if (!reqtarget)
264                 goto out_put;
265         strncpy(reqtarget, target, req->targetlen);
266         req->op = X_READ;
267         req->size = block_size;
268         req->offset = 0;
269         r = xseg_set_req_data(peer->xseg, req, pr);
270         if (r < 0){
271                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
272                                 m->volume);
273                 goto out_put;
274         }
275         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
276         if (p == NoPort){ 
277                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
278                                 m->volume);
279                 goto out_unset;
280         }
281         r = xseg_signal(peer->xseg, p);
282         
283         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
284         return 0;
285
286 out_unset:
287         xseg_get_req_data(peer->xseg, req, &dummy);
288 out_put:
289         xseg_put_request(peer->xseg, req, pr->portno);
290
291 out_fail:
292         remove_map(mapper, m);
293         xqindex idx;
294         while((idx = __xq_pop_head(&m->pending)) != Noneidx) {
295                 fail(peer, (struct peer_req *) idx);
296         }
297
298 out_hash:
299         xhash_free(m->objects);
300 out_q:
301         xq_free(&m->pending);
302 out_map:
303         XSEGLOG2(&lc, E, "failed to load map %s", m->volume);
304         free(m);
305 out_err:
306         return -1;
307
308 map_exists:
309         //assert map loading when this is reached
310         if (m->flags & MF_MAP_LOADING) {
311                 XSEGLOG2(&lc, I, "Map %s already exists and loading. "
312                                 "Adding to pending queue", m->volume);
313                 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME errcheck
314         }
315         else {
316                 XSEGLOG2(&lc, I, "Map %s already exists and loaded. Dispatching.", m->volume);
317                 my_dispatch(peer, pr, pr->req);
318         }
319         return 0;
320 }
321
322
323 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
324                                 char *target, uint32_t targetlen, struct map **m)
325 {
326         struct mapperd *mapper = __get_mapperd(peer);
327         int r;
328         *m = find_map(mapper, target, targetlen);
329         if (*m) {
330                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
331                 if ((*m)->flags & MF_MAP_NOT_READY) {
332                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
333                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
334                         return MF_PENDING;
335                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
336                 //      return -1;
337                 // 
338                 }else {
339                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
340                         return 0;
341                 }
342         }
343         r = load_map(peer, pr, target, targetlen);
344         if (r < 0)
345                 return -1; //error
346         return MF_PENDING;      
347 }
348
349 /*
350  * Object handling functions
351  */
352
353 struct map_node *find_object(struct map *map, uint64_t obj_index)
354 {
355         struct map_node *mn;
356         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
357         if (r < 0)
358                 return NULL;
359         return mn;
360 }
361
362 static int insert_object(struct map *map, struct map_node *mn)
363 {
364         //FIXME no find object first
365         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
366         if (r == -XHASH_ERESIZE) {
367                 unsigned long shift = xhash_grow_size_shift(map->objects);
368                 map->objects = xhash_resize(map->objects, shift, NULL);
369                 if (!map->objects)
370                         return -1;
371                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
372         }
373         return r;
374 }
375
376
377 /*
378  * map read/write functions 
379  */
380 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
381 {
382         int i;
383         //hexlify sha256 value
384         for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
385                 sprintf(mn->object+2*i, "%02x", buf[i]);
386         }
387
388         mn->object[SHA256_DIGEST_SIZE * 2] = 0;
389         mn->objectlen = SHA256_DIGEST_SIZE * 2;
390         mn->flags = MF_OBJECT_EXIST;
391 }
392
393 static inline void map_to_object(struct map_node *mn, char *buf)
394 {
395         char c = buf[0];
396         mn->flags = 0;
397         if (c)
398                 mn->flags |= MF_OBJECT_EXIST;
399         memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
400         mn->object[XSEG_MAX_TARGETLEN] = 0;
401         mn->objectlen = strlen(mn->object);
402 }
403
404 static inline void object_to_map(char* buf, struct map_node *mn)
405 {
406         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
407         memcpy(buf+1, mn->object, mn->objectlen);
408         memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
409 }
410
411 static inline void mapheader_to_map(struct map *m, char *buf)
412 {
413         uint64_t pos = 0;
414         memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
415         pos += SHA256_DIGEST_SIZE;
416         memcpy(buf + pos, &m->size, sizeof(m->size));
417         pos += sizeof(m->size);
418 }
419
420
421 static int object_write(struct peerd *peer, struct peer_req *pr, 
422                                 struct map *map, struct map_node *mn)
423 {
424         void *dummy;
425         struct mapperd *mapper = __get_mapperd(peer);
426         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
427                                                         mapper->mbportno, X_ALLOC);
428         if (!req){
429                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
430                                 "(Map: %s [%llu]",
431                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
432                 goto out_err;
433         }
434         int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
435         if (r < 0){
436                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
437                                 "(Map: %s [%llu]",
438                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
439                 goto out_put;
440         }
441         char *target = xseg_get_target(peer->xseg, req);
442         strncpy(target, map->volume, req->targetlen);
443         req->size = objectsize_in_map;
444         req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
445         req->op = X_WRITE;
446         char *data = xseg_get_data(peer->xseg, req);
447         object_to_map(data, mn);
448
449         r = xseg_set_req_data(peer->xseg, req, pr);
450         if (r < 0){
451                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
452                                 "(Map: %s [%llu]",
453                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
454                 goto out_put;
455         }
456         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
457         if (p == NoPort){
458                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
459                                 "(Map: %s [%llu]",
460                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
461                 goto out_unset;
462         }
463         r = xseg_signal(peer->xseg, p);
464         if (r < 0)
465                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
466
467         XSEGLOG2(&lc, I, "Writing object %s \n\t"
468                         "Map: %s [%llu]",
469                         mn->object, map->volume, (unsigned long long) mn->objectidx);
470
471         return MF_PENDING;
472
473 out_unset:
474         xseg_get_req_data(peer->xseg, req, &dummy);
475 out_put:
476         xseg_put_request(peer->xseg, req, pr->portno);
477 out_err:
478         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
479                         "(Map: %s [%llu]",
480                         mn->object, map->volume, (unsigned long long) mn->objectidx);
481         return -1;
482 }
483
484 static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
485 {
486         void *dummy;
487         struct mapperd *mapper = __get_mapperd(peer);
488         struct map_node *mn;
489         uint64_t i, pos, max_objidx = calc_map_obj(map);
490         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
491                                                         mapper->mbportno, X_ALLOC);
492         if (!req){
493                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
494                 goto out_err;
495         }
496         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 
497                                         mapheader_size + max_objidx * objectsize_in_map);
498         if (r < 0){
499                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
500                 goto out_put;
501         }
502         char *target = xseg_get_target(peer->xseg, req);
503         strncpy(target, map->volume, req->targetlen);
504         char *data = xseg_get_data(peer->xseg, req);
505         mapheader_to_map(map, data);
506         pos = mapheader_size;
507         req->op = X_WRITE;
508         req->size = req->datalen;
509         req->offset = 0;
510
511         if (map->size % block_size)
512                 max_objidx++;
513         for (i = 0; i < max_objidx; i++) {
514                 mn = find_object(map, i);
515                 if (!mn){
516                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
517                                         (unsigned long long) i, map->volume);
518                         goto out_put;
519                 }
520                 object_to_map(data+pos, mn);
521                 pos += objectsize_in_map;
522         }
523         r = xseg_set_req_data(peer->xseg, req, pr);
524         if (r < 0){
525                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
526                                 map->volume);
527                 goto out_put;
528         }
529         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
530         if (p == NoPort){
531                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
532                                 map->volume);
533                 goto out_unset;
534         }
535         r = xseg_signal(peer->xseg, p);
536         if (r < 0)
537                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
538
539         map->flags |= MF_MAP_WRITING;
540         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
541         return MF_PENDING;
542
543 out_unset:
544         xseg_get_req_data(peer->xseg, req, &dummy);
545 out_put:
546         xseg_put_request(peer->xseg, req, pr->portno);
547 out_err:
548         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
549         return -1;
550 }
551
552 static int read_map (struct peerd *peer, struct map *map, char *buf)
553 {
554         char nulls[SHA256_DIGEST_SIZE];
555         memset(nulls, 0, SHA256_DIGEST_SIZE);
556
557         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
558         if (r) {
559                 //read error;
560                 return -1;
561         }
562         //type 1, our type, type 0 pithos map
563         int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
564         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
565         uint64_t pos;
566         uint64_t i, nr_objs;
567         struct map_node *map_node;
568         if (type) {
569                 pos = SHA256_DIGEST_SIZE;
570                 map->size = *(uint64_t *) (buf + pos);
571                 pos += sizeof(uint64_t);
572                 nr_objs = map->size / block_size;
573                 if (map->size % block_size)
574                         nr_objs++;
575                 map_node = calloc(nr_objs, sizeof(struct map_node));
576                 if (!map_node)
577                         return -1;
578
579                 for (i = 0; i < nr_objs; i++) {
580                         map_node[i].map = map;
581                         map_node[i].objectidx = i;
582                         xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
583                         (void) qidx;
584                         map_to_object(&map_node[i], buf + pos);
585                         pos += objectsize_in_map;
586                         r = insert_object(map, &map_node[i]); //FIXME error check
587                 }
588         } else {
589                 pos = 0;
590                 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
591                 map_node = calloc(max_nr_objs, sizeof(struct map_node));
592                 if (!map_node)
593                         return -1;
594                 for (i = 0; i < max_nr_objs; i++) {
595                         if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
596                                 break;
597                         map_node[i].objectidx = i;
598                         map_node[i].map = map;
599                         xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
600                         (void) qidx;
601                         pithosmap_to_object(&map_node[i], buf + pos);
602                         pos += SHA256_DIGEST_SIZE; 
603                         r = insert_object(map, &map_node[i]); //FIXME error check
604                 }
605                 map->size = i * block_size; 
606         }
607         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
608         return 0;
609
610         //FIXME cleanup on error
611 }
612
613 /*
614  * copy up functions
615  */
616
617 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
618 {
619         int r = 0;
620         /*
621         if (mn){
622                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
623                 if (r == -XHASH_ERESIZE) {
624                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
625                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
626                         if (!new_hashmap)
627                                 goto out;
628                         mio->copyups_nodes = new_hashmap;
629                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
630                 }
631         }
632         else {
633                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
634                 if (r == -XHASH_ERESIZE) {
635                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
636                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
637                         if (!new_hashmap)
638                                 goto out;
639                         mio->copyups_nodes = new_hashmap;
640                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
641                 }
642         }
643 out:
644         */
645         mio->copyup_node = mn;
646         return r;
647 }
648
649 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
650 {
651         /*
652         struct map_node *mn;
653         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
654         if (r < 0)
655                 return NULL;
656         return mn;
657         */
658         return mio->copyup_node;
659 }
660
661 static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
662 {
663         struct mapperd *mapper = __get_mapperd(peer);
664         struct mapper_io *mio = __get_mapper_io(pr);
665         struct map *map = mn->map;
666         void *dummy;
667         int r = -1, i;
668         xport p;
669
670         //struct sha256_ctx sha256ctx;
671         uint32_t newtargetlen;
672         char new_target[XSEG_MAX_TARGETLEN + 1]; 
673         unsigned char buf[SHA256_DIGEST_SIZE];  //assert sha256_digest_size(32) <= MAXTARGETLEN 
674         char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
675         strncpy(new_object, map->volume, map->volumelen);
676         sprintf(new_object + map->volumelen, "%u", mn->objectidx); //sprintf adds null termination
677         new_object[XSEG_MAX_TARGETLEN + 19] = 0;
678
679         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
680         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
681                 sprintf (new_target + 2*i, "%02x", buf[i]);
682         newtargetlen = SHA256_DIGEST_SIZE  * 2;
683
684         if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE)) 
685                 goto copyup_zeroblock;
686
687         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
688                                                         mapper->bportno, X_ALLOC);
689         if (!req)
690                 goto out_err;
691         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
692                                 sizeof(struct xseg_request_copy));
693         if (r < 0)
694                 goto out_put;
695
696         char *target = xseg_get_target(peer->xseg, req);
697         strncpy(target, new_target, req->targetlen);
698
699         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
700         strncpy(xcopy->target, mn->object, mn->objectlen);
701         xcopy->targetlen = mn->objectlen;
702
703         req->offset = 0;
704         req->size = block_size;
705         req->op = X_COPY;
706         r = xseg_set_req_data(peer->xseg, req, pr);
707         if (r<0)
708                 goto out_put;
709         r = __set_copyup_node(mio, req, mn);
710         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
711         if (p == NoPort) {
712                 goto out_unset;
713         }
714         xseg_signal(peer->xseg, p);
715         mio->copyups++;
716
717         mn->flags |= MF_OBJECT_COPYING;
718         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
719         return 0;
720
721 out_unset:
722         r = __set_copyup_node(mio, req, NULL);
723         xseg_get_req_data(peer->xseg, req, &dummy);
724 out_put:
725         xseg_put_request(peer->xseg, req, pr->portno);
726 out_err:
727         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
728         return -1;
729
730 copyup_zeroblock:
731         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
732                         "Proceeding in writing the new object in map");
733         /* construct a tmp map_node for writing purposes */
734         struct map_node newmn = *mn;
735         newmn.flags = MF_OBJECT_EXIST;
736         strncpy(newmn.object, new_target, newtargetlen);
737         newmn.object[newtargetlen] = 0;
738         newmn.objectlen = newtargetlen;
739         newmn.objectidx = mn->objectidx; 
740         r = __set_copyup_node(mio, req, mn);
741         r = object_write(peer, pr, map, &newmn);
742         if (r != MF_PENDING){
743                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
744                                 "\n\t of map %s [%llu]",
745                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
746                 return -1;
747         }
748         mn->flags |= MF_OBJECT_WRITING;
749         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
750         return 0;
751 }
752
753 /*
754  * request handling functions
755  */
756
757 static int handle_mapread(struct peerd *peer, struct peer_req *pr, 
758                                 struct xseg_request *req)
759 {
760         int r;
761         xqindex idx;
762         char buf[XSEG_MAX_TARGETLEN];
763         struct mapperd *mapper = __get_mapperd(peer);
764         //assert req->op = X_READ;
765         char *target = xseg_get_target(peer->xseg, req);
766         struct map *map = find_map(mapper, target, req->targetlen);
767         if (!map)
768                 goto out_err;
769         //assert map->flags & MF_MAP_LOADING
770
771         if (req->state & XS_FAILED)
772                 goto out_fail;
773
774         char *data = xseg_get_data(peer->xseg, req);
775         r = read_map(peer, map, data);
776         if (r < 0)
777                 goto out_fail;
778         
779         xseg_put_request(peer->xseg, req, pr->portno);
780         map->flags &= ~MF_MAP_LOADING;
781         XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
782         uint64_t qsize = xq_count(&map->pending);
783         while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
784                 qsize--;
785                 struct peer_req *preq = (struct peer_req *) idx;
786                 my_dispatch(peer, preq, preq->req);
787         }
788         return 0;
789
790 out_fail:
791         XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
792         xseg_put_request(peer->xseg, req, pr->portno);
793         map->flags &= ~MF_MAP_LOADING;
794         while((idx = __xq_pop_head(&map->pending)) != Noneidx){
795                 struct peer_req *preq = (struct peer_req *) idx;
796                 fail(peer, preq);
797         }
798         remove_map(mapper, map);
799         //FIXME not freeing up all objects + object hash
800         free(map);
801         return 0;
802
803 out_err:
804         strncpy(buf, target, req->targetlen);
805         buf[req->targetlen] = 0;
806         XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
807         xseg_put_request(peer->xseg, req, pr->portno);
808         return -1;
809 }
810
811 static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
812                                 struct xseg_request *req)
813 {
814         xqindex idx;
815         char buf[XSEG_MAX_TARGETLEN];
816         struct mapperd *mapper = __get_mapperd(peer);
817         //assert req->op = X_WRITE;
818         char *target = xseg_get_target(peer->xseg, req);
819         struct map *map = find_map(mapper, target, req->targetlen);
820         if (!map) {
821                 fprintf(stderr, "couldn't find map\n");
822                 goto out_err;
823         }
824         //assert map->flags & MF_MAP_WRITING
825
826         if (req->state & XS_FAILED){
827                 fprintf(stderr, "write request failed\n");
828                 goto out_fail;
829         }
830         
831         xseg_put_request(peer->xseg, req, pr->portno);
832         map->flags &= ~MF_MAP_WRITING;
833         XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
834         uint64_t qsize = xq_count(&map->pending);
835         while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
836                 qsize--;
837                 struct peer_req *preq = (struct peer_req *) idx;
838                 my_dispatch(peer, preq, preq->req);
839         }
840         return 0;
841
842
843 out_fail:
844         XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
845         xseg_put_request(peer->xseg, req, pr->portno);
846         map->flags &= ~MF_MAP_WRITING;
847         while((idx = __xq_pop_head(&map->pending)) != Noneidx){
848                 struct peer_req *preq = (struct peer_req *) idx;
849                 fail(peer, preq);
850         }
851         remove_map(mapper, map);
852         //FIXME not freeing up all objects + object hash
853         free(map);
854         return 0;
855
856 out_err:
857         strncpy(buf, target, req->targetlen);
858         buf[req->targetlen] = 0;
859         XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
860         xseg_put_request(peer->xseg, req, pr->portno);
861         return -1;
862 }
863
864 static int handle_clone(struct peerd *peer, struct peer_req *pr, 
865                                 struct xseg_request *req)
866 {
867         struct mapperd *mapper = __get_mapperd(peer);
868         struct mapper_io *mio = __get_mapper_io(pr);
869         (void) mio;
870         int r;
871         char buf[XSEG_MAX_TARGETLEN + 1];
872         char *target;
873
874         if (pr->req->op != X_CLONE) {
875                 //wtf??
876                 XSEGLOG2(&lc, E, "Unknown op %u", req->op);
877                 fail(peer, pr);
878                 return 0;
879         }
880
881         if (req->op == X_WRITE){
882                         //assert state = WRITING;
883                         r = handle_mapwrite(peer, pr ,req);
884                         if (r < 0){
885                                 XSEGLOG2(&lc, E, "handle mapwrite returned error");
886                                 fail(peer, pr);
887                         }
888                         return 0;
889         }
890
891         if (mio->state == WRITING) {
892                 target = xseg_get_target(peer->xseg, pr->req);
893                 strncpy(buf, target, req->targetlen);
894                 buf[req->targetlen] = 0;
895                 XSEGLOG2(&lc, I, "Completing clone request for map %s", buf);
896                 complete(peer, pr);
897                 return 0;
898         }
899
900         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
901         if (!xclone) {
902                 goto out_err;
903         }
904         struct map *map;
905         r = find_or_load_map(peer, pr, xclone->target, xclone->targetlen, &map);
906         if (r < 0){
907                 goto out_err;
908         }
909         else if (r == MF_PENDING)
910                 return 0;
911         
912         if (map->flags & MF_MAP_DESTROYED) {
913                 strncpy(buf, xclone->target, xclone->targetlen);
914                 buf[xclone->targetlen] = 0;
915                 XSEGLOG2(&lc, W, "Map %s destroyed", buf);
916                 target = xseg_get_target(peer->xseg, pr->req);
917                 strncpy(buf, target, req->targetlen);
918                 buf[req->targetlen] = 0;
919                 XSEGLOG2(&lc, W, "Cannont clone %s because base map destroyed", buf);
920                 fail(peer, pr);
921                 return 0;
922         }
923
924         struct map *clonemap = malloc(sizeof(struct map));
925         if (!clonemap) {
926                 goto out_err;
927         }
928         /*
929         FIXME check if clone map exists
930         find_or_load_map(peer, pr, target, req->targetlen, &clonemap)
931         ... (on destroyed what ??
932         if (clonemap) {
933                 target = xseg_get_target(peer->xseg, pr->req);
934                 strncpy(buf, target, req->targetlen);
935                 buf[req->targetlen] = 0;
936                 XSEGLOG2(&lc, W, "Map %s requested for clone exists", buf);
937                 fail(peer, pr);
938                 return 0;
939         }
940         */
941         //alloc and init struct map
942         clonemap->objects = xhash_new(3, INTEGER);
943         if (!clonemap->objects){
944                 goto out_err_clonemap;
945         }
946         xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops);
947         if (!qidx){
948                 goto out_err_objhash;
949         }
950         if (xclone->size < map->size) {
951                 target = xseg_get_target(peer->xseg, pr->req);
952                 strncpy(buf, target, req->targetlen);
953                 buf[req->targetlen] = 0;
954                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
955                                 "\n\t for requested clone %s",
956                                 (unsigned long long) xclone->size,
957                                 (unsigned long long) map->size, buf);
958                 goto out_err_q;
959         }
960         if (xclone->size == -1)
961                 clonemap->size = map->size;
962         else
963                 clonemap->size = xclone->size;
964         clonemap->flags = 0;
965         target = xseg_get_target(peer->xseg, pr->req);
966         strncpy(clonemap->volume, target, pr->req->targetlen);
967         clonemap->volumelen = pr->req->targetlen;
968         clonemap->volume[clonemap->volumelen] = 0; //NULL TERMINATE
969
970         //alloc and init map_nodes
971         unsigned long c = clonemap->size/block_size + 1;
972         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
973         if (!map_nodes){
974                 goto out_err_q;
975         }
976         int i;
977         for (i = 0; i < clonemap->size/block_size + 1; i++) {
978                 struct map_node *mn = find_object(map, i);
979                 if (mn) {
980                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
981                         map_nodes[i].objectlen = mn->objectlen;
982                 } else {
983                         strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2 ?
984                         map_nodes[i].objectlen = strlen(zero_block);
985                 }
986                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
987                 map_nodes[i].flags = 0;
988                 map_nodes[i].objectidx = i;
989                 map_nodes[i].map = clonemap;
990                 xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops);
991                 r = insert_object(clonemap, &map_nodes[i]);
992                 if (r < 0){
993                         goto out_free_all;
994                 }
995         }
996         //insert map
997         r = insert_map(mapper, clonemap);
998         if ( r < 0) {
999                 XSEGLOG2(&lc, E, "Cannot insert map %s", clonemap->volume);
1000                 goto out_free_all;
1001         }
1002         r = map_write(peer, pr, clonemap);
1003         if (r < 0){
1004                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1005                 goto out_remove;
1006         }
1007         else if (r == MF_PENDING) {
1008                 //maybe move this to map_write
1009                 XSEGLOG2(&lc, I, "Writing map %s", clonemap->volume);
1010                 __xq_append_tail(&clonemap->pending, (xqindex) pr);
1011                 mio->state = WRITING;
1012                 return 0;
1013         } else {
1014                 //unknown state
1015                 XSEGLOG2(&lc, I, "Map write for map %s returned unknown value", clonemap->volume);
1016                 goto out_remove;
1017         }
1018         
1019         return 0;
1020
1021 out_remove:
1022         remove_map(mapper, clonemap);
1023 out_free_all:
1024         //FIXME not freeing allocated queues of map_nodes
1025         free(map_nodes);
1026 out_err_q:
1027         xq_free(&clonemap->pending);
1028 out_err_objhash:
1029         xhash_free(clonemap->objects);
1030 out_err_clonemap:
1031         free(clonemap);
1032 out_err:
1033         target = xseg_get_target(peer->xseg, pr->req);
1034         strncpy(buf, target, req->targetlen);
1035         buf[req->targetlen] = 0;
1036         XSEGLOG2(&lc, E, "Clone map for %s failed", buf);
1037         fail(peer, pr);
1038         return -1;
1039 }
1040
1041 static int req2objs(struct peerd *peer, struct peer_req *pr, 
1042                                         struct map *map, int write)
1043 {
1044         char *target = xseg_get_target(peer->xseg, pr->req);
1045         uint32_t nr_objs = calc_nr_obj(pr->req);
1046         uint64_t size = sizeof(struct xseg_reply_map) + 
1047                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1048
1049         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1050         /* resize request to fit reply */
1051         char buf[XSEG_MAX_TARGETLEN];
1052         strncpy(buf, target, pr->req->targetlen);
1053         int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1054         if (r < 0) {
1055                 XSEGLOG2(&lc, E, "Cannot resize request");
1056                 return -1;
1057         }
1058         target = xseg_get_target(peer->xseg, pr->req);
1059         strncpy(target, buf, pr->req->targetlen);
1060
1061         /* structure reply */
1062         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1063         reply->cnt = nr_objs;
1064
1065         uint32_t idx = 0;
1066         uint64_t rem_size = pr->req->size;
1067         uint64_t obj_index = pr->req->offset / block_size;
1068         uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo
1069         uint64_t obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1070         struct map_node * mn = find_object(map, obj_index);
1071         if (!mn) {
1072                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1073                 goto out_err;
1074         }
1075         if (write && (mn->flags & MF_OBJECT_NOT_READY)) 
1076                 goto out_object_copying;
1077         if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1078                 //calc new_target, copy up object
1079                 r = copyup_object(peer, mn, pr);
1080                 if (r < 0) {
1081                         XSEGLOG2(&lc, E, "Error in copy up object");
1082                         goto out_err_copy;
1083                 }
1084                 goto out_object_copying;
1085         }
1086
1087 //      XSEGLOG2(&lc, D, "pr->req->offset: %llu, pr->req->size %llu, block_size %u\n", 
1088 //                              (unsigned long long) pr->req->offset, 
1089 //                              (unsigned long long) pr->req->size, 
1090 //                              block_size);
1091         strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1092         reply->segs[idx].targetlen = mn->objectlen;
1093         reply->segs[idx].offset = obj_offset;
1094         reply->segs[idx].size = obj_size;
1095 //      XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
1096 //                                      (unsigned long long) reply->segs[idx].size,
1097 //                                      (unsigned long long) reply->segs[idx].offset);
1098         rem_size -= obj_size;
1099         while (rem_size > 0) {
1100                 idx++;
1101                 obj_index++;
1102                 obj_offset = 0;
1103                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
1104                 rem_size -= obj_size;
1105                 mn = find_object(map, obj_index);
1106                 if (!mn) {
1107                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1108                         goto out_err;
1109                 }
1110                 if (write && (mn->flags & MF_OBJECT_NOT_READY)) 
1111                         goto out_object_copying;
1112                 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1113                         //calc new_target, copy up object
1114                         r = copyup_object(peer, mn, pr);
1115                         if (r < 0) {
1116                                 XSEGLOG2(&lc, E, "Error in copy up object");
1117                                 goto out_err_copy;
1118                         }
1119                         goto out_object_copying;
1120                 }
1121                 strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1122                 reply->segs[idx].targetlen = mn->objectlen;
1123                 reply->segs[idx].offset = obj_offset;
1124                 reply->segs[idx].size = obj_size;
1125 //              XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
1126 //                              (unsigned long long) reply->segs[idx].size,
1127 //                              (unsigned long long) reply->segs[idx].offset);
1128         }
1129         if (reply->cnt != (idx + 1)){
1130                 XSEGLOG2(&lc, E, "reply->cnt %u, idx+1: %u", reply->cnt, idx+1);
1131                 goto out_err;
1132         }
1133
1134         return 0;
1135
1136 out_object_copying:
1137         //printf("r2o mn: %lx\n", mn);
1138         //printf("volume %s pending on %s\n", map->volume, mn->object);
1139         //assert write
1140         if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx)
1141                 XSEGLOG2(&lc, E, "Cannot append pr to tail");
1142         XSEGLOG2(&lc, I, "object %s is pending \n\t idx:%llu of map %s",
1143                         mn->object, (unsigned long long) mn->objectidx, map->volume);
1144         return MF_PENDING;
1145
1146 out_err_copy:
1147 out_err:
1148         return -1;
1149 }
1150
1151 static int handle_mapr(struct peerd *peer, struct peer_req *pr, 
1152                                 struct xseg_request *req)
1153 {
1154         struct mapperd *mapper = __get_mapperd(peer);
1155         struct mapper_io *mio = __get_mapper_io(pr);
1156         (void)mapper;
1157         (void)mio;
1158         //get_map
1159         char *target = xseg_get_target(peer->xseg, pr->req);
1160         struct map *map;
1161         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1162         if (r < 0) {
1163                 fail(peer, pr);
1164                 return -1;
1165         }
1166         else if (r == MF_PENDING)
1167                 return 0;
1168         
1169         if (map->flags & MF_MAP_DESTROYED) {
1170                 fail(peer, pr);
1171                 return 0;
1172         }
1173         
1174         //get_object
1175         r = req2objs(peer, pr, map, 0);
1176         if  (r < 0){
1177                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1178                                 map->volume, 
1179                                 (unsigned long long) pr->req->offset, 
1180                                 (unsigned long long) (pr->req->offset + pr->req->size));
1181                 fail(peer, pr);
1182         }
1183         else if (r == 0)
1184                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1185                                 map->volume, 
1186                                 (unsigned long long) pr->req->offset, 
1187                                 (unsigned long long) (pr->req->offset + pr->req->size));
1188                 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1189                                 (unsigned long long) req->offset,
1190                                 (unsigned long long) req->size);
1191                 char buf[XSEG_MAX_TARGETLEN+1];
1192                 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1193                 int i;
1194                 for (i = 0; i < reply->cnt; i++) {
1195                         XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1196                         strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1197                         buf[reply->segs[i].targetlen] = 0;
1198                         XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1199                                         (unsigned long long) reply->segs[i].offset,
1200                                         (unsigned long long) reply->segs[i].size);
1201                 }
1202                 complete(peer, pr);
1203
1204         return 0;
1205
1206
1207 }
1208
1209 static int handle_copyup(struct peerd *peer, struct peer_req *pr,
1210                                 struct xseg_request *req)
1211 {
1212         struct mapperd *mapper = __get_mapperd(peer);
1213         (void) mapper;
1214         struct mapper_io *mio = __get_mapper_io(pr);
1215         int r = 0;
1216         xqindex idx;
1217         struct map_node *mn = __get_copyup_node(mio, req);
1218         if (!mn)
1219                 goto out_err;
1220
1221         mn->flags &= ~MF_OBJECT_COPYING;
1222         if (req->state & XS_FAILED && !(req->state & XS_SERVED)){
1223                 XSEGLOG2(&lc, E, "Copy up of object %s failed", mn->object);
1224                 goto out_fail;
1225         }
1226         struct map *map = mn->map;
1227         if (!map){
1228                 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1229                 goto out_fail;
1230         }
1231         
1232         /* construct a tmp map_node for writing purposes */
1233         char *target = xseg_get_target(peer->xseg, req);
1234         struct map_node newmn = *mn;
1235         newmn.flags = MF_OBJECT_EXIST;
1236         strncpy(newmn.object, target, req->targetlen);
1237         newmn.object[req->targetlen] = 0;
1238         newmn.objectlen = req->targetlen;
1239         newmn.objectidx = mn->objectidx; 
1240         r = object_write(peer, pr, map, &newmn);
1241         if (r != MF_PENDING){
1242                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1243                                 "\n\t of map %s [%llu]",
1244                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
1245                 goto out_fail;
1246         }
1247         mn->flags |= MF_OBJECT_WRITING;
1248         xseg_put_request(peer->xseg, req, pr->portno);
1249         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1250         return 0;
1251
1252 out_fail:
1253         xseg_put_request(peer->xseg, req, pr->portno);
1254         __set_copyup_node(mio, req, NULL);
1255         while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1256                 struct peer_req * preq = (struct peer_req *) idx;
1257                 fail(peer, preq);
1258         }
1259         return 0;
1260
1261 out_err:
1262         XSEGLOG2(&lc, E, "Cannot get map node");
1263         return -1;
1264 }
1265
1266 static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
1267                                 struct xseg_request *req)
1268 {
1269         xqindex idx;
1270         struct mapperd *mapper = __get_mapperd(peer);
1271         struct mapper_io *mio = __get_mapper_io(pr);
1272         //assert req->op = X_WRITE;
1273         char *target = xseg_get_target(peer->xseg, req);
1274         (void)target;
1275         (void)mapper;
1276         //printf("handle object write replyi\n");
1277         struct map_node *mn = __get_copyup_node(mio, req);
1278         if (!mn)
1279                 goto out_err;
1280         
1281         __set_copyup_node(mio, req, NULL);
1282         
1283         //assert mn->flags & MF_OBJECT_WRITING
1284         mn->flags &= ~MF_OBJECT_WRITING;
1285         if (req->state & XS_FAILED)
1286                 goto out_fail;
1287
1288         struct map_node tmp;
1289         char *data = xseg_get_data(peer->xseg, req);
1290         map_to_object(&tmp, data);
1291         mn->flags |= MF_OBJECT_EXIST;
1292         if (mn->flags != MF_OBJECT_EXIST){
1293                 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1294                 return *(int *) 0;
1295         }
1296         //assert mn->flags & MF_OBJECT_EXIST
1297         strncpy(mn->object, tmp.object, tmp.objectlen);
1298         mn->object[tmp.objectlen] = 0;
1299         mn->objectlen = tmp.objectlen;
1300         xseg_put_request(peer->xseg, req, pr->portno);
1301
1302         XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1303         uint64_t qsize = xq_count(&mn->pending);
1304         while(qsize > 0 && (idx = __xq_pop_head(&mn->pending)) != Noneidx){
1305                 qsize--;
1306                 struct peer_req * preq = (struct peer_req *) idx;
1307                 my_dispatch(peer, preq, preq->req);
1308         }
1309         return 0;
1310
1311 out_fail:
1312         XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
1313         xseg_put_request(peer->xseg, req, pr->portno);
1314         while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1315                 struct peer_req *preq = (struct peer_req *) idx;
1316                 fail(peer, preq);
1317         }
1318         return 0;
1319
1320 out_err:
1321         XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
1322         xseg_put_request(peer->xseg, req, pr->portno);
1323         return -1;
1324 }
1325
1326 static int handle_mapw(struct peerd *peer, struct peer_req *pr, 
1327                                 struct xseg_request *req)
1328 {
1329         struct mapperd *mapper = __get_mapperd(peer);
1330         struct mapper_io *mio = __get_mapper_io(pr);
1331         (void) mapper;
1332         (void) mio;
1333         /* handle copy up replies separately */
1334         if (req->op == X_COPY){
1335                 if (handle_copyup(peer, pr, req) < 0){
1336                         XSEGLOG2(&lc, E, "Handle copy up returned error");
1337                         fail(peer, pr);
1338                         return -1;
1339                 } else {
1340                         return 0;
1341                 }
1342         }
1343         else if(req->op == X_WRITE){
1344                 /* handle replies of object write operations */
1345                 if (handle_objectwrite(peer, pr, req) < 0) {
1346                         XSEGLOG2(&lc, E, "Handle object write returned error");
1347                         fail(peer, pr);
1348                         return -1;
1349                 } else {
1350                         return 0;
1351                 }
1352         }
1353
1354         char *target = xseg_get_target(peer->xseg, pr->req);
1355         struct map *map;
1356         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1357         if (r < 0) {
1358                 fail(peer, pr);
1359                 return -1;
1360         }
1361         else if (r == MF_PENDING)
1362                 return 0;
1363         
1364         if (map->flags & MF_MAP_DESTROYED) {
1365                 fail(peer, pr);
1366                 return 0;
1367         }
1368
1369         r = req2objs(peer, pr, map, 1);
1370         if (r < 0){
1371                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1372                                 map->volume, 
1373                                 (unsigned long long) pr->req->offset, 
1374                                 (unsigned long long) (pr->req->offset + pr->req->size));
1375                 fail(peer, pr);
1376         }
1377         if (r == 0){
1378                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1379                                 map->volume, 
1380                                 (unsigned long long) pr->req->offset, 
1381                                 (unsigned long long) (pr->req->offset + pr->req->size));
1382                 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1383                                 (unsigned long long) req->offset,
1384                                 (unsigned long long) req->size);
1385                 char buf[XSEG_MAX_TARGETLEN+1];
1386                 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1387                 int i;
1388                 for (i = 0; i < reply->cnt; i++) {
1389                         XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1390                         strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1391                         buf[reply->segs[i].targetlen] = 0;
1392                         XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1393                                         (unsigned long long) reply->segs[i].offset,
1394                                         (unsigned long long) reply->segs[i].size);
1395                 }
1396                 complete(peer, pr);
1397         }
1398         //else copyup pending, wait for pr restart
1399
1400         return 0;
1401 }
1402
1403 static int handle_snap(struct peerd *peer, struct peer_req *pr, 
1404                                 struct xseg_request *req)
1405 {
1406         fail(peer, pr);
1407         return 0;
1408 }
1409
1410 static int handle_info(struct peerd *peer, struct peer_req *pr, 
1411                                 struct xseg_request *req)
1412 {
1413         struct mapperd *mapper = __get_mapperd(peer);
1414         struct mapper_io *mio = __get_mapper_io(pr);
1415         (void) mapper;
1416         (void) mio;
1417         char *target = xseg_get_target(peer->xseg, pr->req);
1418         if (!target) {
1419                 fail(peer, pr);
1420                 return 0;
1421         }
1422         //printf("Handle info\n");
1423         struct map *map;
1424         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1425         if (r < 0) {
1426                 fail(peer, pr);
1427                 return -1;
1428         }
1429         else if (r == MF_PENDING)
1430                 return 0;
1431         if (map->flags & MF_MAP_DESTROYED) {
1432                 fail(peer, pr);
1433                 return 0;
1434         }
1435         
1436         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1437         xinfo->size = map->size;
1438         complete(peer, pr);
1439
1440         return 0;
1441 }
1442
1443 static int delete_object(struct peerd *peer, struct peer_req *pr,
1444                                 struct map_node *mn)
1445 {
1446         void *dummy;
1447         struct mapperd *mapper = __get_mapperd(peer);
1448         struct mapper_io *mio = __get_mapper_io(pr);
1449
1450         mio->delobj = mn->objectidx;
1451         if (xq_count(&mn->pending) != 0) {
1452                 __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1453                 XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1454                                 mn->object);
1455                 return MF_PENDING;
1456         }
1457
1458         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1459                                                         mapper->bportno, X_ALLOC);
1460         if (!req)
1461                 goto out_err;
1462         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1463         if (r < 0)
1464                 goto out_put;
1465         char *target = xseg_get_target(peer->xseg, req);
1466         strncpy(target, mn->object, req->targetlen);
1467         req->op = X_DELETE;
1468         req->size = req->datalen;
1469         req->offset = 0;
1470
1471         r = xseg_set_req_data(peer->xseg, req, pr);
1472         if (r < 0)
1473                 goto out_put;
1474         __set_copyup_node(mio, req, mn);
1475         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1476         if (p == NoPort)
1477                 goto out_unset;
1478         r = xseg_signal(peer->xseg, p);
1479         mn->flags |= MF_OBJECT_DELETING;
1480         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1481         return MF_PENDING;
1482
1483 out_unset:
1484         xseg_get_req_data(peer->xseg, req, &dummy);
1485 out_put:
1486         xseg_put_request(peer->xseg, req, pr->portno);
1487 out_err:
1488         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1489         return -1;
1490 }
1491
1492 /*
1493  * Find next object for deletion. Start searching on idx mio->delobj.
1494  * Skip non existing map_nodes, free_resources and skip non-existing objects
1495  * Wait for all pending operations on the object, before moving forward to the 
1496  * next object.
1497  *
1498  * Return MF_PENDING if theres is a pending operation on the next object
1499  * or zero if there is no next object
1500  */
1501 static int delete_next_object(struct peerd *peer, struct peer_req *pr,
1502                                 struct map *map)
1503 {
1504         struct mapperd *mapper = __get_mapperd(peer);
1505         struct mapper_io *mio = __get_mapper_io(pr);
1506         uint64_t idx = mio->delobj;
1507         struct map_node *mn;
1508         int r;
1509 retry:
1510         while (idx < calc_map_obj(map)) {
1511                 mn = find_object(map, idx);
1512                 if (!mn) {
1513                         idx++;
1514                         goto retry;
1515                 }
1516                 mio->delobj = idx;
1517                 if (xq_count(&mn->pending) != 0) {
1518                         __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1519                         XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1520                                         mn->object);
1521                         return MF_PENDING;
1522                 }
1523                 if (mn->flags & MF_OBJECT_EXIST){
1524                         r = delete_object(peer, pr, mn);
1525                         if (r < 0) {
1526                                 /* on error, just log it, release resources and
1527                                  * proceed to the next object
1528                                  */
1529                                 XSEGLOG2(&lc, E, "Object %s delete object return error"
1530                                                 "\n\t Map: %s [%llu]", 
1531                                                 mn->object, mn->map->volume, 
1532                                                 (unsigned long long) mn->objectidx);
1533                                 xq_free(&mn->pending);
1534                         }
1535                         else if (r == MF_PENDING){
1536                                 return r;
1537                         }
1538                 } else {
1539                         xq_free(&mn->pending);
1540                 }
1541                 idx++;
1542         }
1543         return 0;
1544 }
1545
1546 static int handle_object_delete(struct peerd *peer, struct peer_req *pr, 
1547                                  struct map_node *mn, int err)
1548 {
1549         struct mapperd *mapper = __get_mapperd(peer);
1550         struct mapper_io *mio = __get_mapper_io(pr);
1551         uint64_t idx;
1552         struct map *map = mn->map;
1553         int r;
1554         (void) mio;
1555         //if object deletion failed, map deletion must continue
1556         //and report OK, since map block has been deleted succesfully
1557         //so, no check for err
1558
1559         //assert object flags OK
1560         //free map_node_resources
1561         mn->flags &= ~MF_OBJECT_DELETING;
1562         xq_free(&mn->pending);
1563
1564         mio->delobj++;
1565         r = delete_next_object(peer, pr, map);
1566         if (r != MF_PENDING){
1567                 /* if there is no next object to delete, remove the map block
1568                  * from memory
1569                  */
1570
1571                 //assert map flags OK
1572                 map->flags |= MF_MAP_DESTROYED;
1573                 XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1574                 //make all pending requests on map to fail
1575                 uint64_t qsize = xq_count(&map->pending);
1576                 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
1577                         qsize--;
1578                         struct peer_req * preq = (struct peer_req *) idx;
1579                         my_dispatch(peer, preq, preq->req);
1580                 }
1581                 //free map resources;
1582                 remove_map(mapper, map);
1583                 mn = find_object(map, 0);
1584                 free(mn);
1585                 xq_free(&map->pending);
1586                 free(map);
1587         }
1588         XSEGLOG2(&lc, I, "Handle object delete OK");
1589         return 0;
1590 }
1591
1592 static int delete_map(struct peerd *peer, struct peer_req *pr,
1593                         struct map *map)
1594 {
1595         void *dummy;
1596         struct mapperd *mapper = __get_mapperd(peer);
1597         struct mapper_io *mio = __get_mapper_io(pr);
1598         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1599                                                         mapper->mbportno, X_ALLOC);
1600         if (!req)
1601                 goto out_err;
1602         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1603         if (r < 0)
1604                 goto out_put;
1605         char *target = xseg_get_target(peer->xseg, req);
1606         strncpy(target, map->volume, req->targetlen);
1607         req->op = X_DELETE;
1608         req->size = req->datalen;
1609         req->offset = 0;
1610
1611         r = xseg_set_req_data(peer->xseg, req, pr);
1612         if (r < 0)
1613                 goto out_put;
1614         __set_copyup_node(mio, req, NULL);
1615         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1616         if (p == NoPort)
1617                 goto out_unset;
1618         r = xseg_signal(peer->xseg, p);
1619         map->flags |= MF_MAP_DELETING;
1620         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1621         return MF_PENDING;
1622
1623 out_unset:
1624         xseg_get_req_data(peer->xseg, req, &dummy);
1625 out_put:
1626         xseg_put_request(peer->xseg, req, pr->portno);
1627 out_err:
1628         XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
1629         return -1;
1630 }
1631
1632 static int handle_map_delete(struct peerd *peer, struct peer_req *pr, 
1633                                 struct map *map, int err)
1634 {
1635         struct mapperd *mapper = __get_mapperd(peer);
1636         struct mapper_io *mio = __get_mapper_io(pr);
1637         xqindex idx;
1638         int r;
1639         (void) mio;
1640         map->flags &= ~MF_MAP_DELETING;
1641         if (err) {
1642                 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1643                 //dispatch all pending
1644                 while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1645                         struct peer_req * preq = (struct peer_req *) idx;
1646                         my_dispatch(peer, preq, preq->req);
1647                 }
1648         } else {
1649                 map->flags |= MF_MAP_DESTROYED;
1650                 //delete all objects
1651                 XSEGLOG2(&lc, I, "Map %s map block deleted. Deleting objects", map->volume);
1652                 mio->delobj = 0;
1653                 r = delete_next_object(peer, pr, map);
1654                 if (r != MF_PENDING){
1655                         /* if there is no next object to delete, remove the map block
1656                          * from memory
1657                          */
1658                         //assert map flags OK
1659                         map->flags |= MF_MAP_DESTROYED;
1660                         XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1661                         //make all pending requests on map to fail
1662                         uint64_t qsize = xq_count(&map->pending);
1663                         while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
1664                                 qsize--;
1665                                 struct peer_req * preq = (struct peer_req *) idx;
1666                                 my_dispatch(peer, preq, preq->req);
1667                         }
1668                         //free map resources;
1669                         remove_map(mapper, map);
1670                         struct map_node *mn = find_object(map, 0);
1671                         if (mn)
1672                                 free(mn);
1673                         xq_free(&map->pending);
1674                         free(map);
1675                 }
1676         }
1677         return 0;
1678 }
1679
1680 static int handle_delete(struct peerd *peer, struct peer_req *pr, 
1681                                 struct xseg_request *req)
1682 {
1683         struct mapperd *mapper = __get_mapperd(peer);
1684         struct mapper_io *mio = __get_mapper_io(pr);
1685         struct map_node *mn;
1686         struct map *map;
1687         int err = 0;
1688         if (req->state & XS_FAILED && !(req->state &XS_SERVED)) 
1689                 err = 1;
1690         
1691         mn = __get_copyup_node(mio, req);
1692         __set_copyup_node(mio, req, NULL);
1693         char *target = xseg_get_target(peer->xseg, req);
1694         if (!mn) {
1695                 //map block delete
1696                 map = find_map(mapper, target, req->targetlen);
1697                 if (!map) {
1698                         xseg_put_request(peer->xseg, req, pr->portno);
1699                         return -1;
1700                 }
1701                 handle_map_delete(peer, pr, map, err);
1702         } else {
1703                 //object delete
1704                 map = mn->map;
1705                 if (!map) {
1706                         xseg_put_request(peer->xseg, req, pr->portno);
1707                         return -1;
1708                 }
1709                 handle_object_delete(peer, pr, mn, err);
1710         }
1711         xseg_put_request(peer->xseg, req, pr->portno);
1712         return 0;
1713 }
1714
1715 static int handle_destroy(struct peerd *peer, struct peer_req *pr, 
1716                                 struct xseg_request *req)
1717 {
1718         struct mapperd *mapper = __get_mapperd(peer);
1719         struct mapper_io *mio = __get_mapper_io(pr);
1720         (void) mapper;
1721         int r;
1722         char buf[XSEG_MAX_TARGETLEN+1];
1723         char *target = xseg_get_target(peer->xseg, pr->req);
1724
1725         strncpy(buf, target, pr->req->targetlen);
1726         buf[req->targetlen] = 0;
1727
1728         XSEGLOG2(&lc, D, "Handle destroy pr: %lx, pr->req: %lx, req: %lx",
1729                         (unsigned long) pr, (unsigned long) pr->req,
1730                         (unsigned long) req);
1731         XSEGLOG2(&lc, D, "target: %s (%u)", buf, strlen(buf));
1732         if (pr->req != req && req->op == X_DELETE) {
1733                 //assert mio->state == DELETING
1734                 r = handle_delete(peer, pr, req);
1735                 if (r < 0) {
1736                         XSEGLOG2(&lc, E, "Handle delete returned error");
1737                         fail(peer, pr);
1738                         return -1;
1739                 } else {
1740                         return 0;
1741                 }
1742         }
1743
1744         struct map *map;
1745         r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1746         if (r < 0) {
1747                 fail(peer, pr);
1748                 return -1;
1749         }
1750         else if (r == MF_PENDING)
1751                 return 0;
1752         if (map->flags & MF_MAP_DESTROYED) {
1753                 if (mio->state == DELETING){
1754                         XSEGLOG2(&lc, I, "Map %s destroyed", map->volume);
1755                         complete(peer, pr);
1756                 }
1757                 else{
1758                         XSEGLOG2(&lc, I, "Map %s already destroyed", map->volume);
1759                         fail(peer, pr);
1760                 }
1761                 return 0;
1762         }
1763         if (mio->state == DELETING) {
1764                 //continue deleting map objects;
1765                 r = delete_next_object(peer ,pr, map);
1766                 if (r != MF_PENDING){
1767                         complete(peer, pr);
1768                 }
1769                 return 0;
1770         }
1771         //delete map block
1772         r = delete_map(peer, pr, map);
1773         if (r < 0) {
1774                 XSEGLOG2(&lc, E, "Map delete for map %s returned error", map->volume);
1775                 fail(peer, pr);
1776                 return -1;
1777         } else if (r == MF_PENDING) {
1778                 XSEGLOG2(&lc, I, "Map %s delete pending", map->volume);
1779                 __xq_append_tail(&map->pending, (xqindex) pr);
1780                 mio->state = DELETING;
1781                 return 0;
1782         }
1783         //unreachable
1784         XSEGLOG2(&lc, E, "Destroy unreachable");
1785         fail(peer, pr);
1786         return 0;
1787 }
1788
1789 static int handle_dropcache(struct peerd *peer, struct peer_req *pr, 
1790                                 struct xseg_request *req)
1791 {
1792         struct mapperd *mapper = __get_mapperd(peer);
1793         struct mapper_io *mio = __get_mapper_io(pr);
1794         (void) mapper;
1795         (void) mio;
1796         char *target = xseg_get_target(peer->xseg, pr->req);
1797         if (!target) {
1798                 fail(peer, pr);
1799                 return 0;
1800         }
1801
1802         struct map *map = find_map(mapper, target, pr->req->targetlen);
1803         if (!map){
1804                 complete(peer, pr);
1805                 return 0;
1806         } else if (map->flags & MF_MAP_DESTROYED) {
1807                 complete(peer, pr);
1808                 return 0;
1809         } else if (map->flags & MF_MAP_NOT_READY && mio->state != DROPPING_CACHE) {
1810                 __xq_append_tail(&map->pending, (xqindex) pr);
1811                 return 0;
1812         }
1813
1814         if (mio->state != DROPPING_CACHE) {
1815                 /* block all future operations on the map */
1816                 map->flags |= MF_MAP_DROPPING_CACHE;
1817                 mio->dcobj = 0;
1818                 mio->state = DROPPING_CACHE;
1819                 XSEGLOG2(&lc, I, "Map %s start dropping cache", map->volume);
1820         } else {
1821                 XSEGLOG2(&lc, I, "Map %s continue dropping cache", map->volume);
1822         }
1823
1824         struct map_node *mn; 
1825         uint64_t i;
1826         for (i = mio->dcobj; i < calc_map_obj(map); i++) {
1827                 mn = find_object(map, i);
1828                 if (!mn)
1829                         continue;
1830                 mio->dcobj = i;
1831                 if (xq_count(&mn->pending) != 0){
1832                         XSEGLOG2(&lc, D, "Map %s pending dropping cache for obj idx: %llu", 
1833                                 map->volume, (unsigned long long) mn->objectidx);
1834                         __xq_append_tail(&mn->pending, (xqindex) pr);
1835                         return 0;
1836                 }
1837                 xq_free(&mn->pending);
1838                 XSEGLOG2(&lc, D, "Map %s dropped cache for obj idx: %llu", 
1839                                 map->volume, (unsigned long long) mn->objectidx);
1840         }
1841         remove_map(mapper, map);
1842         //dispatch pending
1843         uint64_t qsize = xq_count(&map->pending);
1844         while(qsize > 0 && (i = __xq_pop_head(&map->pending)) != Noneidx){
1845                 qsize--;
1846                 struct peer_req * preq = (struct peer_req *) i;
1847                 my_dispatch(peer, preq, preq->req);
1848         }
1849         XSEGLOG2(&lc, I, "Map %s droped cache", map->volume);
1850         
1851         //free map resources;
1852         mn = find_object(map, 0);
1853         if (mn)
1854                 free(mn);
1855         xq_free(&map->pending);
1856         free(map);
1857
1858         complete(peer, pr);
1859
1860         return 0;
1861 }
1862
1863 static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1864 {
1865         struct mapperd *mapper = __get_mapperd(peer);
1866         (void) mapper;
1867         struct mapper_io *mio = __get_mapper_io(pr);
1868         (void) mio;
1869
1870         if (req->op == X_READ) {
1871                 /* catch map reads requests here */
1872                 handle_mapread(peer, pr, req);
1873                 return 0;
1874         }
1875
1876         switch (pr->req->op) {
1877                 /* primary xseg operations of mapper */
1878                 case X_CLONE: handle_clone(peer, pr, req); break;
1879                 case X_MAPR: handle_mapr(peer, pr, req); break;
1880                 case X_MAPW: handle_mapw(peer, pr, req); break;
1881 //              case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1882                 case X_INFO: handle_info(peer, pr, req); break;
1883                 case X_DELETE: handle_destroy(peer, pr, req); break;
1884                 case X_CLOSE: handle_dropcache(peer, pr, req); break;
1885                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1886         }
1887         return 0;
1888 }
1889
1890 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1891 {
1892         struct mapperd *mapper = __get_mapperd(peer);
1893         (void) mapper;
1894         struct mapper_io *mio = __get_mapper_io(pr);
1895         (void) mio;
1896
1897         if (pr->req == req)
1898                 mio->state = ACCEPTED;
1899         my_dispatch(peer, pr ,req);
1900         return 0;
1901 }
1902
1903 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1904 {
1905         int i;
1906         unsigned char buf[SHA256_DIGEST_SIZE];
1907         char *zero;
1908
1909         gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1910
1911         /* Version check should be the very first call because it
1912           makes sure that important subsystems are intialized. */
1913         gcry_check_version (NULL);
1914      
1915         /* Disable secure memory.  */
1916         gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1917      
1918         /* Tell Libgcrypt that initialization has completed. */
1919         gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1920
1921         /* calculate out magic sha hash value */
1922         gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1923
1924         /* calculate zero block */
1925         //FIXME check hash value
1926         zero = malloc(block_size);
1927         memset(zero, 0, block_size);
1928         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1929         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1930                 sprintf(zero_block + 2*i, "%02x", buf[i]);
1931         printf("%s \n", zero_block);
1932         free(zero);
1933
1934         //FIXME error checks
1935         struct mapperd *mapper = malloc(sizeof(struct mapperd));
1936         mapper->hashmaps = xhash_new(3, STRING);
1937         peer->priv = mapper;
1938         
1939         for (i = 0; i < peer->nr_ops; i++) {
1940                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1941                 mio->copyups_nodes = xhash_new(3, INTEGER);
1942                 mio->copyups = 0;
1943                 mio->err = 0;
1944                 peer->peer_reqs[i].priv = mio;
1945         }
1946
1947         for (i = 0; i < argc; i++) {
1948                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1949                         mapper->bportno = atoi(argv[i+1]);
1950                         i += 1;
1951                         continue;
1952                 }
1953                 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1954                         mapper->mbportno = atoi(argv[i+1]);
1955                         i += 1;
1956                         continue;
1957                 }
1958                 /* enforce only one thread */
1959                 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1960                         int t = atoi(argv[i+1]);
1961                         if (t != 1) {
1962                                 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1963                                 return -1;
1964                         }
1965                         i += 1;
1966                         continue;
1967                 }
1968         }
1969
1970         const struct sched_param param = { .sched_priority = 99 };
1971         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
1972
1973
1974 //      test_map(peer);
1975
1976         return 0;
1977 }
1978
1979 void print_obj(struct map_node *mn)
1980 {
1981         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
1982                         (unsigned long long) mn->objectidx, mn->object, 
1983                         (unsigned int) mn->objectlen, 
1984                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
1985 }
1986
1987 void print_map(struct map *m)
1988 {
1989         uint64_t nr_objs = m->size/block_size;
1990         if (m->size % block_size)
1991                 nr_objs++;
1992         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n", 
1993                         m->volume, m->volumelen, 
1994                         (unsigned long long) m->size, 
1995                         (unsigned long long) nr_objs);
1996         uint64_t i;
1997         struct map_node *mn;
1998         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
1999                 return;
2000         for (i = 0; i < nr_objs; i++) {
2001                 mn = find_object(m, i);
2002                 if (!mn){
2003                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
2004                         continue;
2005                 }
2006                 print_obj(mn);
2007         }
2008 }
2009
2010 /*
2011 void test_map(struct peerd *peer)
2012 {
2013         int i,j, ret;
2014         //struct sha256_ctx sha256ctx;
2015         unsigned char buf[SHA256_DIGEST_SIZE];
2016         char buf_new[XSEG_MAX_TARGETLEN + 20];
2017         struct map *m = malloc(sizeof(struct map));
2018         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2019         m->volume[XSEG_MAX_TARGETLEN] = 0;
2020         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2021         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2022         m->volumelen = XSEG_MAX_TARGETLEN;
2023         m->size = 100*block_size;
2024         m->objects = xhash_new(3, INTEGER);
2025         struct map_node *map_node = calloc(100, sizeof(struct map_node));
2026         for (i = 0; i < 100; i++) {
2027                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2028                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2029                 
2030                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2031                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2032                 }
2033                 map_node[i].objectidx = i;
2034                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2035                 map_node[i].flags = MF_OBJECT_EXIST;
2036                 ret = insert_object(m, &map_node[i]);
2037         }
2038
2039         char *data = malloc(block_size);
2040         mapheader_to_map(m, data);
2041         uint64_t pos = mapheader_size;
2042
2043         for (i = 0; i < 100; i++) {
2044                 map_node = find_object(m, i);
2045                 if (!map_node){
2046                         printf("no object node %d \n", i);
2047                         exit(1);
2048                 }
2049                 object_to_map(data+pos, map_node);
2050                 pos += objectsize_in_map;
2051         }
2052 //      print_map(m);
2053
2054         struct map *m2 = malloc(sizeof(struct map));
2055         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2056         m->volume[XSEG_MAX_TARGETLEN] = 0;
2057         m->volumelen = XSEG_MAX_TARGETLEN;
2058
2059         m2->objects = xhash_new(3, INTEGER);
2060         ret = read_map(peer, m2, data);
2061 //      print_map(m2);
2062
2063         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2064         ssize_t r, sum = 0;
2065         while (sum < block_size) {
2066                 r = write(fd, data + sum, block_size -sum);
2067                 if (r < 0){
2068                         perror("write");
2069                         printf("write error\n");
2070                         exit(1);
2071                 } 
2072                 sum += r;
2073         }
2074         close(fd);
2075         map_node = find_object(m, 0);
2076         free(map_node);
2077         free(m);
2078 }
2079 */