add dispatch reason to peers skeleton
[archipelago] / xseg / peers / user / mt-mapperd.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <pthread.h>
5 #include <xseg/xseg.h>
6 #include <mpeer.h>
7 #include <time.h>
8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <gcrypt.h>
14 #include <errno.h>
15 #include <sched.h>
16 #include <sys/syscall.h>
17
18 GCRY_THREAD_OPTION_PTHREAD_IMPL;
19
20 #define MF_PENDING 1
21
22 #define SHA256_DIGEST_SIZE 32
23 /* hex representation of sha256 value takes up double the sha256 size */
24 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
25
26 #define block_size (1<<22) //FIXME this should be defined here?
27 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
28 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value  + volume size */
29
30 #define MF_OBJECT_EXIST         (1 << 0)
31 #define MF_OBJECT_COPYING       (1 << 1)
32 #define MF_OBJECT_WRITING       (1 << 2)
33 #define MF_OBJECT_DELETING      (1 << 3)
34
35 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
36 extern struct log_ctx lc;
37
38 char *magic_string = "This a magic string. Please hash me";
39 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
40 char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
41
42 //internal mapper states
43 enum mapper_state {
44         ACCEPTED = 0,
45         WRITING = 1,
46         COPYING = 2,
47         DELETING = 3,
48         DROPPING_CACHE = 4
49 };
50
51 struct map_node {
52         uint32_t flags;
53         uint32_t objectidx;
54         uint32_t objectlen;
55         char object[XSEG_MAX_TARGETLEN + 1];    /* NULL terminated string */
56         struct xq pending;                      /* pending peer_reqs on this object */
57         struct map *map;
58 };
59
60 #define MF_MAP_LOADING          (1 << 0)
61 #define MF_MAP_DESTROYED        (1 << 1)
62 #define MF_MAP_WRITING          (1 << 2)
63 #define MF_MAP_DELETING         (1 << 3)
64 #define MF_MAP_DROPPING_CACHE   (1 << 4)
65
66 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
67                                         MF_MAP_DROPPING_CACHE)
68
69 struct map {
70         uint32_t flags;
71         uint64_t size;
72         uint32_t volumelen;
73         char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
74         xhash_t *objects;       /* obj_index --> map_node */
75         struct xq pending;      /* pending peer_reqs on this map */
76 };
77
78 struct mapperd {
79         xport bportno;          /* blocker that accesses data */
80         xport mbportno;         /* blocker that accesses maps */
81         xhash_t *hashmaps; // hash_function(target) --> struct map
82 };
83
84 struct mapper_io {
85         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
86         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
87         struct map_node *copyup_node;
88         int err;                        /* error flag */
89         uint64_t delobj;
90         uint64_t dcobj;
91         enum mapper_state state;
92 };
93
94 void print_map(struct map *m);
95
96 /*
97  * Helper functions
98  */
99
100 static inline struct mapperd * __get_mapperd(struct peerd *peer)
101 {
102         return (struct mapperd *) peer->priv;
103 }
104
105 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
106 {
107         return (struct mapper_io *) pr->priv;
108 }
109
110 static inline uint64_t calc_map_obj(struct map *map)
111 {
112         uint64_t nr_objs = map->size / block_size;
113         if (map->size % block_size)
114                 nr_objs++;
115         return nr_objs;
116 }
117
118 static uint32_t calc_nr_obj(struct xseg_request *req)
119 {
120         unsigned int r = 1;
121         uint64_t rem_size = req->size;
122         uint64_t obj_offset = req->offset & (block_size -1); //modulo
123         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
124         rem_size -= obj_size;
125         while (rem_size > 0) {
126                 obj_size = (rem_size > block_size) ? block_size : rem_size;
127                 rem_size -= obj_size;
128                 r++;
129         }
130
131         return r;
132 }
133
134 /*
135  * Maps handling functions
136  */
137
138 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
139 {
140         int r;
141         struct map *m = NULL;
142         char buf[XSEG_MAX_TARGETLEN+1];
143         //assert targetlen <= XSEG_MAX_TARGETLEN
144         strncpy(buf, target, targetlen);
145         buf[targetlen] = 0;
146         XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
147         r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
148         if (r < 0)
149                 return NULL;
150         return m;
151 }
152
153
154 static int insert_map(struct mapperd *mapper, struct map *map)
155 {
156         int r = -1;
157         
158         if (find_map(mapper, map->volume, map->volumelen)){
159                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
160                 goto out;
161         }
162
163         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
164                         map->volume, strlen(map->volume), (unsigned long) map);
165         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
166         while (r == -XHASH_ERESIZE) {
167                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
168                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
169                 if (!new_hashmap){
170                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
171                                         (unsigned long long) shift);
172                         goto out;
173                 }
174                 mapper->hashmaps = new_hashmap;
175                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
176         }
177 out:
178         return r;
179 }
180
181 static int remove_map(struct mapperd *mapper, struct map *map)
182 {
183         int r = -1;
184         
185         //assert no pending pr on map
186         
187         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
188         while (r == -XHASH_ERESIZE) {
189                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
190                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
191                 if (!new_hashmap){
192                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
193                                         (unsigned long long) shift);
194                         goto out;
195                 }
196                 mapper->hashmaps = new_hashmap;
197                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
198         }
199 out:
200         return r;
201 }
202
203 /* async map load */
204 static int load_map(struct peerd *peer, struct peer_req *pr, char *target, 
205                         uint32_t targetlen)
206 {
207         int r;
208         xport p;
209         struct xseg_request *req;
210         struct mapperd *mapper = __get_mapperd(peer);
211         void *dummy;
212
213         struct map *m = find_map(mapper, target, targetlen);
214         if (!m) {
215                 m = malloc(sizeof(struct map));
216                 if (!m){
217                         XSEGLOG2(&lc, E, "Cannot allocate map ");
218                         goto out_err;
219                 }
220                 m->size = -1;
221                 strncpy(m->volume, target, targetlen);
222                 m->volume[targetlen] = 0;
223                 m->volumelen = targetlen;
224                 m->flags = MF_MAP_LOADING;
225                 xqindex *qidx = xq_alloc_empty(&m->pending, peer->nr_ops);
226                 if (!qidx) {
227                         XSEGLOG2(&lc, E, "Cannot allocate pending queue for map %s",
228                                         m->volume);
229                         goto out_map;
230                 }
231                 m->objects = xhash_new(3, INTEGER); 
232                 if (!m->objects){
233                         XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
234                                         m->volume);
235                         goto out_q;
236                 }
237                 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME err check
238         } else {
239                 goto map_exists;
240         }
241
242         r = insert_map(mapper, m);
243         if (r < 0)  
244                 goto out_hash;
245         
246
247         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
248         if (!req){
249                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
250                                 m->volume);
251                 goto out_fail;
252         }
253
254         r = xseg_prep_request(peer->xseg, req, targetlen, block_size);
255         if (r < 0){
256                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
257                                 m->volume);
258                 goto out_put;
259         }
260
261         char *reqtarget = xseg_get_target(peer->xseg, req);
262         if (!reqtarget)
263                 goto out_put;
264         strncpy(reqtarget, target, req->targetlen);
265         req->op = X_READ;
266         req->size = block_size;
267         req->offset = 0;
268         r = xseg_set_req_data(peer->xseg, req, pr);
269         if (r < 0){
270                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
271                                 m->volume);
272                 goto out_put;
273         }
274         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
275         if (p == NoPort){ 
276                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
277                                 m->volume);
278                 goto out_unset;
279         }
280         r = xseg_signal(peer->xseg, p);
281         
282         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
283         return 0;
284
285 out_unset:
286         xseg_get_req_data(peer->xseg, req, &dummy);
287 out_put:
288         xseg_put_request(peer->xseg, req, pr->portno);
289
290 out_fail:
291         remove_map(mapper, m);
292         xqindex idx;
293         while((idx = __xq_pop_head(&m->pending)) != Noneidx) {
294                 fail(peer, (struct peer_req *) idx);
295         }
296
297 out_hash:
298         xhash_free(m->objects);
299 out_q:
300         xq_free(&m->pending);
301 out_map:
302         XSEGLOG2(&lc, E, "failed to load map %s", m->volume);
303         free(m);
304 out_err:
305         return -1;
306
307 map_exists:
308         //assert map loading when this is reached
309         if (m->flags & MF_MAP_LOADING) {
310                 XSEGLOG2(&lc, I, "Map %s already exists and loading. "
311                                 "Adding to pending queue", m->volume);
312                 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME errcheck
313         }
314         else {
315                 XSEGLOG2(&lc, I, "Map %s already exists and loaded. Dispatching.", m->volume);
316                 dispatch(peer, pr, pr->req, internal);
317         }
318         return 0;
319 }
320
321
322 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
323                                 char *target, uint32_t targetlen, struct map **m)
324 {
325         struct mapperd *mapper = __get_mapperd(peer);
326         int r;
327         *m = find_map(mapper, target, targetlen);
328         if (*m) {
329                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
330                 if ((*m)->flags & MF_MAP_NOT_READY) {
331                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
332                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
333                         return MF_PENDING;
334                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
335                 //      return -1;
336                 // 
337                 }else {
338                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
339                         return 0;
340                 }
341         }
342         r = load_map(peer, pr, target, targetlen);
343         if (r < 0)
344                 return -1; //error
345         return MF_PENDING;      
346 }
347
348 /*
349  * Object handling functions
350  */
351
352 struct map_node *find_object(struct map *map, uint64_t obj_index)
353 {
354         struct map_node *mn;
355         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
356         if (r < 0)
357                 return NULL;
358         return mn;
359 }
360
361 static int insert_object(struct map *map, struct map_node *mn)
362 {
363         //FIXME no find object first
364         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
365         if (r == -XHASH_ERESIZE) {
366                 unsigned long shift = xhash_grow_size_shift(map->objects);
367                 map->objects = xhash_resize(map->objects, shift, NULL);
368                 if (!map->objects)
369                         return -1;
370                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
371         }
372         return r;
373 }
374
375
376 /*
377  * map read/write functions 
378  */
379 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
380 {
381         int i;
382         //hexlify sha256 value
383         for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
384                 sprintf(mn->object+2*i, "%02x", buf[i]);
385         }
386
387         mn->object[SHA256_DIGEST_SIZE * 2] = 0;
388         mn->objectlen = SHA256_DIGEST_SIZE * 2;
389         mn->flags = MF_OBJECT_EXIST;
390 }
391
392 static inline void map_to_object(struct map_node *mn, char *buf)
393 {
394         char c = buf[0];
395         mn->flags = 0;
396         if (c)
397                 mn->flags |= MF_OBJECT_EXIST;
398         memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
399         mn->object[XSEG_MAX_TARGETLEN] = 0;
400         mn->objectlen = strlen(mn->object);
401 }
402
403 static inline void object_to_map(char* buf, struct map_node *mn)
404 {
405         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
406         memcpy(buf+1, mn->object, mn->objectlen);
407         memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
408 }
409
410 static inline void mapheader_to_map(struct map *m, char *buf)
411 {
412         uint64_t pos = 0;
413         memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
414         pos += SHA256_DIGEST_SIZE;
415         memcpy(buf + pos, &m->size, sizeof(m->size));
416         pos += sizeof(m->size);
417 }
418
419
420 static int object_write(struct peerd *peer, struct peer_req *pr, 
421                                 struct map *map, struct map_node *mn)
422 {
423         void *dummy;
424         struct mapperd *mapper = __get_mapperd(peer);
425         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
426                                                         mapper->mbportno, X_ALLOC);
427         if (!req){
428                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
429                                 "(Map: %s [%llu]",
430                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
431                 goto out_err;
432         }
433         int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
434         if (r < 0){
435                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
436                                 "(Map: %s [%llu]",
437                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
438                 goto out_put;
439         }
440         char *target = xseg_get_target(peer->xseg, req);
441         strncpy(target, map->volume, req->targetlen);
442         req->size = objectsize_in_map;
443         req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
444         req->op = X_WRITE;
445         char *data = xseg_get_data(peer->xseg, req);
446         object_to_map(data, mn);
447
448         r = xseg_set_req_data(peer->xseg, req, pr);
449         if (r < 0){
450                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
451                                 "(Map: %s [%llu]",
452                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
453                 goto out_put;
454         }
455         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
456         if (p == NoPort){
457                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
458                                 "(Map: %s [%llu]",
459                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
460                 goto out_unset;
461         }
462         r = xseg_signal(peer->xseg, p);
463         if (r < 0)
464                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
465
466         XSEGLOG2(&lc, I, "Writing object %s \n\t"
467                         "Map: %s [%llu]",
468                         mn->object, map->volume, (unsigned long long) mn->objectidx);
469
470         return MF_PENDING;
471
472 out_unset:
473         xseg_get_req_data(peer->xseg, req, &dummy);
474 out_put:
475         xseg_put_request(peer->xseg, req, pr->portno);
476 out_err:
477         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
478                         "(Map: %s [%llu]",
479                         mn->object, map->volume, (unsigned long long) mn->objectidx);
480         return -1;
481 }
482
483 static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
484 {
485         void *dummy;
486         struct mapperd *mapper = __get_mapperd(peer);
487         struct map_node *mn;
488         uint64_t i, pos, max_objidx = calc_map_obj(map);
489         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
490                                                         mapper->mbportno, X_ALLOC);
491         if (!req){
492                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
493                 goto out_err;
494         }
495         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 
496                                         mapheader_size + max_objidx * objectsize_in_map);
497         if (r < 0){
498                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
499                 goto out_put;
500         }
501         char *target = xseg_get_target(peer->xseg, req);
502         strncpy(target, map->volume, req->targetlen);
503         char *data = xseg_get_data(peer->xseg, req);
504         mapheader_to_map(map, data);
505         pos = mapheader_size;
506         req->op = X_WRITE;
507         req->size = req->datalen;
508         req->offset = 0;
509
510         if (map->size % block_size)
511                 max_objidx++;
512         for (i = 0; i < max_objidx; i++) {
513                 mn = find_object(map, i);
514                 if (!mn){
515                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
516                                         (unsigned long long) i, map->volume);
517                         goto out_put;
518                 }
519                 object_to_map(data+pos, mn);
520                 pos += objectsize_in_map;
521         }
522         r = xseg_set_req_data(peer->xseg, req, pr);
523         if (r < 0){
524                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
525                                 map->volume);
526                 goto out_put;
527         }
528         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
529         if (p == NoPort){
530                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
531                                 map->volume);
532                 goto out_unset;
533         }
534         r = xseg_signal(peer->xseg, p);
535         if (r < 0)
536                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
537
538         map->flags |= MF_MAP_WRITING;
539         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
540         return MF_PENDING;
541
542 out_unset:
543         xseg_get_req_data(peer->xseg, req, &dummy);
544 out_put:
545         xseg_put_request(peer->xseg, req, pr->portno);
546 out_err:
547         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
548         return -1;
549 }
550
551 static int read_map (struct peerd *peer, struct map *map, char *buf)
552 {
553         char nulls[SHA256_DIGEST_SIZE];
554         memset(nulls, 0, SHA256_DIGEST_SIZE);
555
556         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
557         if (r) {
558                 //read error;
559                 return -1;
560         }
561         //type 1, our type, type 0 pithos map
562         int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
563         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
564         uint64_t pos;
565         uint64_t i, nr_objs;
566         struct map_node *map_node;
567         if (type) {
568                 pos = SHA256_DIGEST_SIZE;
569                 map->size = *(uint64_t *) (buf + pos);
570                 pos += sizeof(uint64_t);
571                 nr_objs = map->size / block_size;
572                 if (map->size % block_size)
573                         nr_objs++;
574                 map_node = calloc(nr_objs, sizeof(struct map_node));
575                 if (!map_node)
576                         return -1;
577
578                 for (i = 0; i < nr_objs; i++) {
579                         map_node[i].map = map;
580                         map_node[i].objectidx = i;
581                         xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
582                         (void) qidx;
583                         map_to_object(&map_node[i], buf + pos);
584                         pos += objectsize_in_map;
585                         r = insert_object(map, &map_node[i]); //FIXME error check
586                 }
587         } else {
588                 pos = 0;
589                 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
590                 map_node = calloc(max_nr_objs, sizeof(struct map_node));
591                 if (!map_node)
592                         return -1;
593                 for (i = 0; i < max_nr_objs; i++) {
594                         if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
595                                 break;
596                         map_node[i].objectidx = i;
597                         map_node[i].map = map;
598                         xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
599                         (void) qidx;
600                         pithosmap_to_object(&map_node[i], buf + pos);
601                         pos += SHA256_DIGEST_SIZE; 
602                         r = insert_object(map, &map_node[i]); //FIXME error check
603                 }
604                 map->size = i * block_size; 
605         }
606         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
607         return 0;
608
609         //FIXME cleanup on error
610 }
611
612 /*
613  * copy up functions
614  */
615
616 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
617 {
618         int r = 0;
619         /*
620         if (mn){
621                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
622                 if (r == -XHASH_ERESIZE) {
623                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
624                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
625                         if (!new_hashmap)
626                                 goto out;
627                         mio->copyups_nodes = new_hashmap;
628                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
629                 }
630         }
631         else {
632                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
633                 if (r == -XHASH_ERESIZE) {
634                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
635                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
636                         if (!new_hashmap)
637                                 goto out;
638                         mio->copyups_nodes = new_hashmap;
639                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
640                 }
641         }
642 out:
643         */
644         mio->copyup_node = mn;
645         return r;
646 }
647
648 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
649 {
650         /*
651         struct map_node *mn;
652         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
653         if (r < 0)
654                 return NULL;
655         return mn;
656         */
657         return mio->copyup_node;
658 }
659
660 static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
661 {
662         struct mapperd *mapper = __get_mapperd(peer);
663         struct mapper_io *mio = __get_mapper_io(pr);
664         struct map *map = mn->map;
665         void *dummy;
666         int r = -1, i;
667         xport p;
668
669         //struct sha256_ctx sha256ctx;
670         uint32_t newtargetlen;
671         char new_target[XSEG_MAX_TARGETLEN + 1]; 
672         unsigned char buf[SHA256_DIGEST_SIZE];  //assert sha256_digest_size(32) <= MAXTARGETLEN 
673         char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
674         strncpy(new_object, map->volume, map->volumelen);
675         sprintf(new_object + map->volumelen, "%u", mn->objectidx); //sprintf adds null termination
676         new_object[XSEG_MAX_TARGETLEN + 19] = 0;
677
678         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
679         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
680                 sprintf (new_target + 2*i, "%02x", buf[i]);
681         newtargetlen = SHA256_DIGEST_SIZE  * 2;
682
683         if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE)) 
684                 goto copyup_zeroblock;
685
686         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
687                                                         mapper->bportno, X_ALLOC);
688         if (!req)
689                 goto out_err;
690         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
691                                 sizeof(struct xseg_request_copy));
692         if (r < 0)
693                 goto out_put;
694
695         char *target = xseg_get_target(peer->xseg, req);
696         strncpy(target, new_target, req->targetlen);
697
698         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
699         strncpy(xcopy->target, mn->object, mn->objectlen);
700         xcopy->targetlen = mn->objectlen;
701
702         req->offset = 0;
703         req->size = block_size;
704         req->op = X_COPY;
705         r = xseg_set_req_data(peer->xseg, req, pr);
706         if (r<0)
707                 goto out_put;
708         r = __set_copyup_node(mio, req, mn);
709         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
710         if (p == NoPort) {
711                 goto out_unset;
712         }
713         xseg_signal(peer->xseg, p);
714         mio->copyups++;
715
716         mn->flags |= MF_OBJECT_COPYING;
717         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
718         return 0;
719
720 out_unset:
721         r = __set_copyup_node(mio, req, NULL);
722         xseg_get_req_data(peer->xseg, req, &dummy);
723 out_put:
724         xseg_put_request(peer->xseg, req, pr->portno);
725 out_err:
726         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
727         return -1;
728
729 copyup_zeroblock:
730         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
731                         "Proceeding in writing the new object in map");
732         /* construct a tmp map_node for writing purposes */
733         struct map_node newmn = *mn;
734         newmn.flags = MF_OBJECT_EXIST;
735         strncpy(newmn.object, new_target, newtargetlen);
736         newmn.object[newtargetlen] = 0;
737         newmn.objectlen = newtargetlen;
738         newmn.objectidx = mn->objectidx; 
739         r = __set_copyup_node(mio, req, mn);
740         r = object_write(peer, pr, map, &newmn);
741         if (r != MF_PENDING){
742                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
743                                 "\n\t of map %s [%llu]",
744                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
745                 return -1;
746         }
747         mn->flags |= MF_OBJECT_WRITING;
748         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
749         return 0;
750 }
751
752 /*
753  * request handling functions
754  */
755
756 static int handle_mapread(struct peerd *peer, struct peer_req *pr, 
757                                 struct xseg_request *req)
758 {
759         int r;
760         xqindex idx;
761         char buf[XSEG_MAX_TARGETLEN];
762         struct mapperd *mapper = __get_mapperd(peer);
763         //assert req->op = X_READ;
764         char *target = xseg_get_target(peer->xseg, req);
765         struct map *map = find_map(mapper, target, req->targetlen);
766         if (!map)
767                 goto out_err;
768         //assert map->flags & MF_MAP_LOADING
769
770         if (req->state & XS_FAILED)
771                 goto out_fail;
772
773         char *data = xseg_get_data(peer->xseg, req);
774         r = read_map(peer, map, data);
775         if (r < 0)
776                 goto out_fail;
777         
778         xseg_put_request(peer->xseg, req, pr->portno);
779         map->flags &= ~MF_MAP_LOADING;
780         XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
781         uint64_t qsize = xq_count(&map->pending);
782         while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
783                 qsize--;
784                 struct peer_req *preq = (struct peer_req *) idx;
785                 dispatch(peer, preq, preq->req, internal);
786         }
787         return 0;
788
789 out_fail:
790         XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
791         xseg_put_request(peer->xseg, req, pr->portno);
792         map->flags &= ~MF_MAP_LOADING;
793         while((idx = __xq_pop_head(&map->pending)) != Noneidx){
794                 struct peer_req *preq = (struct peer_req *) idx;
795                 fail(peer, preq);
796         }
797         remove_map(mapper, map);
798         //FIXME not freeing up all objects + object hash
799         free(map);
800         return 0;
801
802 out_err:
803         strncpy(buf, target, req->targetlen);
804         buf[req->targetlen] = 0;
805         XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
806         xseg_put_request(peer->xseg, req, pr->portno);
807         return -1;
808 }
809
810 static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
811                                 struct xseg_request *req)
812 {
813         xqindex idx;
814         char buf[XSEG_MAX_TARGETLEN];
815         struct mapperd *mapper = __get_mapperd(peer);
816         //assert req->op = X_WRITE;
817         char *target = xseg_get_target(peer->xseg, req);
818         struct map *map = find_map(mapper, target, req->targetlen);
819         if (!map) {
820                 fprintf(stderr, "couldn't find map\n");
821                 goto out_err;
822         }
823         //assert map->flags & MF_MAP_WRITING
824
825         if (req->state & XS_FAILED){
826                 fprintf(stderr, "write request failed\n");
827                 goto out_fail;
828         }
829         
830         xseg_put_request(peer->xseg, req, pr->portno);
831         map->flags &= ~MF_MAP_WRITING;
832         XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
833         uint64_t qsize = xq_count(&map->pending);
834         while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
835                 qsize--;
836                 struct peer_req *preq = (struct peer_req *) idx;
837                 dispatch(peer, preq, preq->req, internal);
838         }
839         return 0;
840
841
842 out_fail:
843         XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
844         xseg_put_request(peer->xseg, req, pr->portno);
845         map->flags &= ~MF_MAP_WRITING;
846         while((idx = __xq_pop_head(&map->pending)) != Noneidx){
847                 struct peer_req *preq = (struct peer_req *) idx;
848                 fail(peer, preq);
849         }
850         remove_map(mapper, map);
851         //FIXME not freeing up all objects + object hash
852         free(map);
853         return 0;
854
855 out_err:
856         strncpy(buf, target, req->targetlen);
857         buf[req->targetlen] = 0;
858         XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
859         xseg_put_request(peer->xseg, req, pr->portno);
860         return -1;
861 }
862
863 static int handle_clone(struct peerd *peer, struct peer_req *pr, 
864                                 struct xseg_request *req)
865 {
866         struct mapperd *mapper = __get_mapperd(peer);
867         struct mapper_io *mio = __get_mapper_io(pr);
868         (void) mio;
869         int r;
870         char buf[XSEG_MAX_TARGETLEN + 1];
871         char *target;
872
873         if (pr->req->op != X_CLONE) {
874                 //wtf??
875                 XSEGLOG2(&lc, E, "Unknown op %u", req->op);
876                 fail(peer, pr);
877                 return 0;
878         }
879
880         if (req->op == X_WRITE){
881                         //assert state = WRITING;
882                         r = handle_mapwrite(peer, pr ,req);
883                         if (r < 0){
884                                 XSEGLOG2(&lc, E, "handle mapwrite returned error");
885                                 fail(peer, pr);
886                         }
887                         return 0;
888         }
889
890         if (mio->state == WRITING) {
891                 target = xseg_get_target(peer->xseg, pr->req);
892                 strncpy(buf, target, req->targetlen);
893                 buf[req->targetlen] = 0;
894                 XSEGLOG2(&lc, I, "Completing clone request for map %s", buf);
895                 complete(peer, pr);
896                 return 0;
897         }
898
899         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
900         if (!xclone) {
901                 goto out_err;
902         }
903         struct map *map;
904         r = find_or_load_map(peer, pr, xclone->target, xclone->targetlen, &map);
905         if (r < 0){
906                 goto out_err;
907         }
908         else if (r == MF_PENDING)
909                 return 0;
910         
911         if (map->flags & MF_MAP_DESTROYED) {
912                 strncpy(buf, xclone->target, xclone->targetlen);
913                 buf[xclone->targetlen] = 0;
914                 XSEGLOG2(&lc, W, "Map %s destroyed", buf);
915                 target = xseg_get_target(peer->xseg, pr->req);
916                 strncpy(buf, target, req->targetlen);
917                 buf[req->targetlen] = 0;
918                 XSEGLOG2(&lc, W, "Cannont clone %s because base map destroyed", buf);
919                 fail(peer, pr);
920                 return 0;
921         }
922
923         struct map *clonemap = malloc(sizeof(struct map));
924         if (!clonemap) {
925                 goto out_err;
926         }
927         /*
928         FIXME check if clone map exists
929         find_or_load_map(peer, pr, target, req->targetlen, &clonemap)
930         ... (on destroyed what ??
931         if (clonemap) {
932                 target = xseg_get_target(peer->xseg, pr->req);
933                 strncpy(buf, target, req->targetlen);
934                 buf[req->targetlen] = 0;
935                 XSEGLOG2(&lc, W, "Map %s requested for clone exists", buf);
936                 fail(peer, pr);
937                 return 0;
938         }
939         */
940         //alloc and init struct map
941         clonemap->objects = xhash_new(3, INTEGER);
942         if (!clonemap->objects){
943                 goto out_err_clonemap;
944         }
945         xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops);
946         if (!qidx){
947                 goto out_err_objhash;
948         }
949         if (xclone->size < map->size) {
950                 target = xseg_get_target(peer->xseg, pr->req);
951                 strncpy(buf, target, req->targetlen);
952                 buf[req->targetlen] = 0;
953                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
954                                 "\n\t for requested clone %s",
955                                 (unsigned long long) xclone->size,
956                                 (unsigned long long) map->size, buf);
957                 goto out_err_q;
958         }
959         if (xclone->size == -1)
960                 clonemap->size = map->size;
961         else
962                 clonemap->size = xclone->size;
963         clonemap->flags = 0;
964         target = xseg_get_target(peer->xseg, pr->req);
965         strncpy(clonemap->volume, target, pr->req->targetlen);
966         clonemap->volumelen = pr->req->targetlen;
967         clonemap->volume[clonemap->volumelen] = 0; //NULL TERMINATE
968
969         //alloc and init map_nodes
970         unsigned long c = clonemap->size/block_size + 1;
971         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
972         if (!map_nodes){
973                 goto out_err_q;
974         }
975         int i;
976         for (i = 0; i < clonemap->size/block_size + 1; i++) {
977                 struct map_node *mn = find_object(map, i);
978                 if (mn) {
979                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
980                         map_nodes[i].objectlen = mn->objectlen;
981                 } else {
982                         strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2 ?
983                         map_nodes[i].objectlen = strlen(zero_block);
984                 }
985                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
986                 map_nodes[i].flags = 0;
987                 map_nodes[i].objectidx = i;
988                 map_nodes[i].map = clonemap;
989                 xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops);
990                 r = insert_object(clonemap, &map_nodes[i]);
991                 if (r < 0){
992                         goto out_free_all;
993                 }
994         }
995         //insert map
996         r = insert_map(mapper, clonemap);
997         if ( r < 0) {
998                 XSEGLOG2(&lc, E, "Cannot insert map %s", clonemap->volume);
999                 goto out_free_all;
1000         }
1001         r = map_write(peer, pr, clonemap);
1002         if (r < 0){
1003                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1004                 goto out_remove;
1005         }
1006         else if (r == MF_PENDING) {
1007                 //maybe move this to map_write
1008                 XSEGLOG2(&lc, I, "Writing map %s", clonemap->volume);
1009                 __xq_append_tail(&clonemap->pending, (xqindex) pr);
1010                 mio->state = WRITING;
1011                 return 0;
1012         } else {
1013                 //unknown state
1014                 XSEGLOG2(&lc, I, "Map write for map %s returned unknown value", clonemap->volume);
1015                 goto out_remove;
1016         }
1017         
1018         return 0;
1019
1020 out_remove:
1021         remove_map(mapper, clonemap);
1022 out_free_all:
1023         //FIXME not freeing allocated queues of map_nodes
1024         free(map_nodes);
1025 out_err_q:
1026         xq_free(&clonemap->pending);
1027 out_err_objhash:
1028         xhash_free(clonemap->objects);
1029 out_err_clonemap:
1030         free(clonemap);
1031 out_err:
1032         target = xseg_get_target(peer->xseg, pr->req);
1033         strncpy(buf, target, req->targetlen);
1034         buf[req->targetlen] = 0;
1035         XSEGLOG2(&lc, E, "Clone map for %s failed", buf);
1036         fail(peer, pr);
1037         return -1;
1038 }
1039
1040 static int req2objs(struct peerd *peer, struct peer_req *pr, 
1041                                         struct map *map, int write)
1042 {
1043         char *target = xseg_get_target(peer->xseg, pr->req);
1044         uint32_t nr_objs = calc_nr_obj(pr->req);
1045         uint64_t size = sizeof(struct xseg_reply_map) + 
1046                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1047
1048         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1049         /* resize request to fit reply */
1050         char buf[XSEG_MAX_TARGETLEN];
1051         strncpy(buf, target, pr->req->targetlen);
1052         int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1053         if (r < 0) {
1054                 XSEGLOG2(&lc, E, "Cannot resize request");
1055                 return -1;
1056         }
1057         target = xseg_get_target(peer->xseg, pr->req);
1058         strncpy(target, buf, pr->req->targetlen);
1059
1060         /* structure reply */
1061         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1062         reply->cnt = nr_objs;
1063
1064         uint32_t idx = 0;
1065         uint64_t rem_size = pr->req->size;
1066         uint64_t obj_index = pr->req->offset / block_size;
1067         uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo
1068         uint64_t obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1069         struct map_node * mn = find_object(map, obj_index);
1070         if (!mn) {
1071                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1072                 goto out_err;
1073         }
1074         if (write && (mn->flags & MF_OBJECT_NOT_READY)) 
1075                 goto out_object_copying;
1076         if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1077                 //calc new_target, copy up object
1078                 r = copyup_object(peer, mn, pr);
1079                 if (r < 0) {
1080                         XSEGLOG2(&lc, E, "Error in copy up object");
1081                         goto out_err_copy;
1082                 }
1083                 goto out_object_copying;
1084         }
1085
1086 //      XSEGLOG2(&lc, D, "pr->req->offset: %llu, pr->req->size %llu, block_size %u\n", 
1087 //                              (unsigned long long) pr->req->offset, 
1088 //                              (unsigned long long) pr->req->size, 
1089 //                              block_size);
1090         strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1091         reply->segs[idx].targetlen = mn->objectlen;
1092         reply->segs[idx].offset = obj_offset;
1093         reply->segs[idx].size = obj_size;
1094 //      XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
1095 //                                      (unsigned long long) reply->segs[idx].size,
1096 //                                      (unsigned long long) reply->segs[idx].offset);
1097         rem_size -= obj_size;
1098         while (rem_size > 0) {
1099                 idx++;
1100                 obj_index++;
1101                 obj_offset = 0;
1102                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
1103                 rem_size -= obj_size;
1104                 mn = find_object(map, obj_index);
1105                 if (!mn) {
1106                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1107                         goto out_err;
1108                 }
1109                 if (write && (mn->flags & MF_OBJECT_NOT_READY)) 
1110                         goto out_object_copying;
1111                 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1112                         //calc new_target, copy up object
1113                         r = copyup_object(peer, mn, pr);
1114                         if (r < 0) {
1115                                 XSEGLOG2(&lc, E, "Error in copy up object");
1116                                 goto out_err_copy;
1117                         }
1118                         goto out_object_copying;
1119                 }
1120                 strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1121                 reply->segs[idx].targetlen = mn->objectlen;
1122                 reply->segs[idx].offset = obj_offset;
1123                 reply->segs[idx].size = obj_size;
1124 //              XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
1125 //                              (unsigned long long) reply->segs[idx].size,
1126 //                              (unsigned long long) reply->segs[idx].offset);
1127         }
1128         if (reply->cnt != (idx + 1)){
1129                 XSEGLOG2(&lc, E, "reply->cnt %u, idx+1: %u", reply->cnt, idx+1);
1130                 goto out_err;
1131         }
1132
1133         return 0;
1134
1135 out_object_copying:
1136         //printf("r2o mn: %lx\n", mn);
1137         //printf("volume %s pending on %s\n", map->volume, mn->object);
1138         //assert write
1139         if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx)
1140                 XSEGLOG2(&lc, E, "Cannot append pr to tail");
1141         XSEGLOG2(&lc, I, "object %s is pending \n\t idx:%llu of map %s",
1142                         mn->object, (unsigned long long) mn->objectidx, map->volume);
1143         return MF_PENDING;
1144
1145 out_err_copy:
1146 out_err:
1147         return -1;
1148 }
1149
1150 static int handle_mapr(struct peerd *peer, struct peer_req *pr, 
1151                                 struct xseg_request *req)
1152 {
1153         struct mapperd *mapper = __get_mapperd(peer);
1154         struct mapper_io *mio = __get_mapper_io(pr);
1155         (void)mapper;
1156         (void)mio;
1157         //get_map
1158         char *target = xseg_get_target(peer->xseg, pr->req);
1159         struct map *map;
1160         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1161         if (r < 0) {
1162                 fail(peer, pr);
1163                 return -1;
1164         }
1165         else if (r == MF_PENDING)
1166                 return 0;
1167         
1168         if (map->flags & MF_MAP_DESTROYED) {
1169                 fail(peer, pr);
1170                 return 0;
1171         }
1172         
1173         //get_object
1174         r = req2objs(peer, pr, map, 0);
1175         if  (r < 0){
1176                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1177                                 map->volume, 
1178                                 (unsigned long long) pr->req->offset, 
1179                                 (unsigned long long) (pr->req->offset + pr->req->size));
1180                 fail(peer, pr);
1181         }
1182         else if (r == 0)
1183                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1184                                 map->volume, 
1185                                 (unsigned long long) pr->req->offset, 
1186                                 (unsigned long long) (pr->req->offset + pr->req->size));
1187                 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1188                                 (unsigned long long) req->offset,
1189                                 (unsigned long long) req->size);
1190                 char buf[XSEG_MAX_TARGETLEN+1];
1191                 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1192                 int i;
1193                 for (i = 0; i < reply->cnt; i++) {
1194                         XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1195                         strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1196                         buf[reply->segs[i].targetlen] = 0;
1197                         XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1198                                         (unsigned long long) reply->segs[i].offset,
1199                                         (unsigned long long) reply->segs[i].size);
1200                 }
1201                 complete(peer, pr);
1202
1203         return 0;
1204
1205
1206 }
1207
1208 static int handle_copyup(struct peerd *peer, struct peer_req *pr,
1209                                 struct xseg_request *req)
1210 {
1211         struct mapperd *mapper = __get_mapperd(peer);
1212         (void) mapper;
1213         struct mapper_io *mio = __get_mapper_io(pr);
1214         int r = 0;
1215         xqindex idx;
1216         struct map_node *mn = __get_copyup_node(mio, req);
1217         if (!mn)
1218                 goto out_err;
1219
1220         mn->flags &= ~MF_OBJECT_COPYING;
1221         if (req->state & XS_FAILED && !(req->state & XS_SERVED)){
1222                 XSEGLOG2(&lc, E, "Copy up of object %s failed", mn->object);
1223                 goto out_fail;
1224         }
1225         struct map *map = mn->map;
1226         if (!map){
1227                 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1228                 goto out_fail;
1229         }
1230         
1231         /* construct a tmp map_node for writing purposes */
1232         char *target = xseg_get_target(peer->xseg, req);
1233         struct map_node newmn = *mn;
1234         newmn.flags = MF_OBJECT_EXIST;
1235         strncpy(newmn.object, target, req->targetlen);
1236         newmn.object[req->targetlen] = 0;
1237         newmn.objectlen = req->targetlen;
1238         newmn.objectidx = mn->objectidx; 
1239         r = object_write(peer, pr, map, &newmn);
1240         if (r != MF_PENDING){
1241                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1242                                 "\n\t of map %s [%llu]",
1243                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
1244                 goto out_fail;
1245         }
1246         mn->flags |= MF_OBJECT_WRITING;
1247         xseg_put_request(peer->xseg, req, pr->portno);
1248         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1249         return 0;
1250
1251 out_fail:
1252         xseg_put_request(peer->xseg, req, pr->portno);
1253         __set_copyup_node(mio, req, NULL);
1254         while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1255                 struct peer_req * preq = (struct peer_req *) idx;
1256                 fail(peer, preq);
1257         }
1258         return 0;
1259
1260 out_err:
1261         XSEGLOG2(&lc, E, "Cannot get map node");
1262         return -1;
1263 }
1264
1265 static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
1266                                 struct xseg_request *req)
1267 {
1268         xqindex idx;
1269         struct mapperd *mapper = __get_mapperd(peer);
1270         struct mapper_io *mio = __get_mapper_io(pr);
1271         //assert req->op = X_WRITE;
1272         char *target = xseg_get_target(peer->xseg, req);
1273         (void)target;
1274         (void)mapper;
1275         //printf("handle object write replyi\n");
1276         struct map_node *mn = __get_copyup_node(mio, req);
1277         if (!mn)
1278                 goto out_err;
1279         
1280         __set_copyup_node(mio, req, NULL);
1281         
1282         //assert mn->flags & MF_OBJECT_WRITING
1283         mn->flags &= ~MF_OBJECT_WRITING;
1284         if (req->state & XS_FAILED)
1285                 goto out_fail;
1286
1287         struct map_node tmp;
1288         char *data = xseg_get_data(peer->xseg, req);
1289         map_to_object(&tmp, data);
1290         mn->flags |= MF_OBJECT_EXIST;
1291         if (mn->flags != MF_OBJECT_EXIST){
1292                 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1293                 return *(int *) 0;
1294         }
1295         //assert mn->flags & MF_OBJECT_EXIST
1296         strncpy(mn->object, tmp.object, tmp.objectlen);
1297         mn->object[tmp.objectlen] = 0;
1298         mn->objectlen = tmp.objectlen;
1299         xseg_put_request(peer->xseg, req, pr->portno);
1300
1301         XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1302         uint64_t qsize = xq_count(&mn->pending);
1303         while(qsize > 0 && (idx = __xq_pop_head(&mn->pending)) != Noneidx){
1304                 qsize--;
1305                 struct peer_req * preq = (struct peer_req *) idx;
1306                 dispatch(peer, preq, preq->req, internal);
1307         }
1308         return 0;
1309
1310 out_fail:
1311         XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
1312         xseg_put_request(peer->xseg, req, pr->portno);
1313         while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1314                 struct peer_req *preq = (struct peer_req *) idx;
1315                 fail(peer, preq);
1316         }
1317         return 0;
1318
1319 out_err:
1320         XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
1321         xseg_put_request(peer->xseg, req, pr->portno);
1322         return -1;
1323 }
1324
1325 static int handle_mapw(struct peerd *peer, struct peer_req *pr, 
1326                                 struct xseg_request *req)
1327 {
1328         struct mapperd *mapper = __get_mapperd(peer);
1329         struct mapper_io *mio = __get_mapper_io(pr);
1330         (void) mapper;
1331         (void) mio;
1332         /* handle copy up replies separately */
1333         if (req->op == X_COPY){
1334                 if (handle_copyup(peer, pr, req) < 0){
1335                         XSEGLOG2(&lc, E, "Handle copy up returned error");
1336                         fail(peer, pr);
1337                         return -1;
1338                 } else {
1339                         return 0;
1340                 }
1341         }
1342         else if(req->op == X_WRITE){
1343                 /* handle replies of object write operations */
1344                 if (handle_objectwrite(peer, pr, req) < 0) {
1345                         XSEGLOG2(&lc, E, "Handle object write returned error");
1346                         fail(peer, pr);
1347                         return -1;
1348                 } else {
1349                         return 0;
1350                 }
1351         }
1352
1353         char *target = xseg_get_target(peer->xseg, pr->req);
1354         struct map *map;
1355         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1356         if (r < 0) {
1357                 fail(peer, pr);
1358                 return -1;
1359         }
1360         else if (r == MF_PENDING)
1361                 return 0;
1362         
1363         if (map->flags & MF_MAP_DESTROYED) {
1364                 fail(peer, pr);
1365                 return 0;
1366         }
1367
1368         r = req2objs(peer, pr, map, 1);
1369         if (r < 0){
1370                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1371                                 map->volume, 
1372                                 (unsigned long long) pr->req->offset, 
1373                                 (unsigned long long) (pr->req->offset + pr->req->size));
1374                 fail(peer, pr);
1375         }
1376         if (r == 0){
1377                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1378                                 map->volume, 
1379                                 (unsigned long long) pr->req->offset, 
1380                                 (unsigned long long) (pr->req->offset + pr->req->size));
1381                 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1382                                 (unsigned long long) req->offset,
1383                                 (unsigned long long) req->size);
1384                 char buf[XSEG_MAX_TARGETLEN+1];
1385                 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1386                 int i;
1387                 for (i = 0; i < reply->cnt; i++) {
1388                         XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1389                         strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1390                         buf[reply->segs[i].targetlen] = 0;
1391                         XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1392                                         (unsigned long long) reply->segs[i].offset,
1393                                         (unsigned long long) reply->segs[i].size);
1394                 }
1395                 complete(peer, pr);
1396         }
1397         //else copyup pending, wait for pr restart
1398
1399         return 0;
1400 }
1401
1402 static int handle_snap(struct peerd *peer, struct peer_req *pr, 
1403                                 struct xseg_request *req)
1404 {
1405         fail(peer, pr);
1406         return 0;
1407 }
1408
1409 static int handle_info(struct peerd *peer, struct peer_req *pr, 
1410                                 struct xseg_request *req)
1411 {
1412         struct mapperd *mapper = __get_mapperd(peer);
1413         struct mapper_io *mio = __get_mapper_io(pr);
1414         (void) mapper;
1415         (void) mio;
1416         char *target = xseg_get_target(peer->xseg, pr->req);
1417         if (!target) {
1418                 fail(peer, pr);
1419                 return 0;
1420         }
1421         //printf("Handle info\n");
1422         struct map *map;
1423         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1424         if (r < 0) {
1425                 fail(peer, pr);
1426                 return -1;
1427         }
1428         else if (r == MF_PENDING)
1429                 return 0;
1430         if (map->flags & MF_MAP_DESTROYED) {
1431                 fail(peer, pr);
1432                 return 0;
1433         }
1434         
1435         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1436         xinfo->size = map->size;
1437         complete(peer, pr);
1438
1439         return 0;
1440 }
1441
1442 static int delete_object(struct peerd *peer, struct peer_req *pr,
1443                                 struct map_node *mn)
1444 {
1445         void *dummy;
1446         struct mapperd *mapper = __get_mapperd(peer);
1447         struct mapper_io *mio = __get_mapper_io(pr);
1448
1449         mio->delobj = mn->objectidx;
1450         if (xq_count(&mn->pending) != 0) {
1451                 __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1452                 XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1453                                 mn->object);
1454                 return MF_PENDING;
1455         }
1456
1457         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1458                                                         mapper->bportno, X_ALLOC);
1459         if (!req)
1460                 goto out_err;
1461         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1462         if (r < 0)
1463                 goto out_put;
1464         char *target = xseg_get_target(peer->xseg, req);
1465         strncpy(target, mn->object, req->targetlen);
1466         req->op = X_DELETE;
1467         req->size = req->datalen;
1468         req->offset = 0;
1469
1470         r = xseg_set_req_data(peer->xseg, req, pr);
1471         if (r < 0)
1472                 goto out_put;
1473         __set_copyup_node(mio, req, mn);
1474         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1475         if (p == NoPort)
1476                 goto out_unset;
1477         r = xseg_signal(peer->xseg, p);
1478         mn->flags |= MF_OBJECT_DELETING;
1479         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1480         return MF_PENDING;
1481
1482 out_unset:
1483         xseg_get_req_data(peer->xseg, req, &dummy);
1484 out_put:
1485         xseg_put_request(peer->xseg, req, pr->portno);
1486 out_err:
1487         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1488         return -1;
1489 }
1490
1491 /*
1492  * Find next object for deletion. Start searching on idx mio->delobj.
1493  * Skip non existing map_nodes, free_resources and skip non-existing objects
1494  * Wait for all pending operations on the object, before moving forward to the 
1495  * next object.
1496  *
1497  * Return MF_PENDING if theres is a pending operation on the next object
1498  * or zero if there is no next object
1499  */
1500 static int delete_next_object(struct peerd *peer, struct peer_req *pr,
1501                                 struct map *map)
1502 {
1503         struct mapperd *mapper = __get_mapperd(peer);
1504         struct mapper_io *mio = __get_mapper_io(pr);
1505         uint64_t idx = mio->delobj;
1506         struct map_node *mn;
1507         int r;
1508 retry:
1509         while (idx < calc_map_obj(map)) {
1510                 mn = find_object(map, idx);
1511                 if (!mn) {
1512                         idx++;
1513                         goto retry;
1514                 }
1515                 mio->delobj = idx;
1516                 if (xq_count(&mn->pending) != 0) {
1517                         __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1518                         XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1519                                         mn->object);
1520                         return MF_PENDING;
1521                 }
1522                 if (mn->flags & MF_OBJECT_EXIST){
1523                         r = delete_object(peer, pr, mn);
1524                         if (r < 0) {
1525                                 /* on error, just log it, release resources and
1526                                  * proceed to the next object
1527                                  */
1528                                 XSEGLOG2(&lc, E, "Object %s delete object return error"
1529                                                 "\n\t Map: %s [%llu]", 
1530                                                 mn->object, mn->map->volume, 
1531                                                 (unsigned long long) mn->objectidx);
1532                                 xq_free(&mn->pending);
1533                         }
1534                         else if (r == MF_PENDING){
1535                                 return r;
1536                         }
1537                 } else {
1538                         xq_free(&mn->pending);
1539                 }
1540                 idx++;
1541         }
1542         return 0;
1543 }
1544
1545 static int handle_object_delete(struct peerd *peer, struct peer_req *pr, 
1546                                  struct map_node *mn, int err)
1547 {
1548         struct mapperd *mapper = __get_mapperd(peer);
1549         struct mapper_io *mio = __get_mapper_io(pr);
1550         uint64_t idx;
1551         struct map *map = mn->map;
1552         int r;
1553         (void) mio;
1554         //if object deletion failed, map deletion must continue
1555         //and report OK, since map block has been deleted succesfully
1556         //so, no check for err
1557
1558         //assert object flags OK
1559         //free map_node_resources
1560         mn->flags &= ~MF_OBJECT_DELETING;
1561         xq_free(&mn->pending);
1562
1563         mio->delobj++;
1564         r = delete_next_object(peer, pr, map);
1565         if (r != MF_PENDING){
1566                 /* if there is no next object to delete, remove the map block
1567                  * from memory
1568                  */
1569
1570                 //assert map flags OK
1571                 map->flags |= MF_MAP_DESTROYED;
1572                 XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1573                 //make all pending requests on map to fail
1574                 uint64_t qsize = xq_count(&map->pending);
1575                 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
1576                         qsize--;
1577                         struct peer_req * preq = (struct peer_req *) idx;
1578                         dispatch(peer, preq, preq->req, internal);
1579                 }
1580                 //free map resources;
1581                 remove_map(mapper, map);
1582                 mn = find_object(map, 0);
1583                 free(mn);
1584                 xq_free(&map->pending);
1585                 free(map);
1586         }
1587         XSEGLOG2(&lc, I, "Handle object delete OK");
1588         return 0;
1589 }
1590
1591 static int delete_map(struct peerd *peer, struct peer_req *pr,
1592                         struct map *map)
1593 {
1594         void *dummy;
1595         struct mapperd *mapper = __get_mapperd(peer);
1596         struct mapper_io *mio = __get_mapper_io(pr);
1597         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1598                                                         mapper->mbportno, X_ALLOC);
1599         if (!req)
1600                 goto out_err;
1601         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1602         if (r < 0)
1603                 goto out_put;
1604         char *target = xseg_get_target(peer->xseg, req);
1605         strncpy(target, map->volume, req->targetlen);
1606         req->op = X_DELETE;
1607         req->size = req->datalen;
1608         req->offset = 0;
1609
1610         r = xseg_set_req_data(peer->xseg, req, pr);
1611         if (r < 0)
1612                 goto out_put;
1613         __set_copyup_node(mio, req, NULL);
1614         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1615         if (p == NoPort)
1616                 goto out_unset;
1617         r = xseg_signal(peer->xseg, p);
1618         map->flags |= MF_MAP_DELETING;
1619         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1620         return MF_PENDING;
1621
1622 out_unset:
1623         xseg_get_req_data(peer->xseg, req, &dummy);
1624 out_put:
1625         xseg_put_request(peer->xseg, req, pr->portno);
1626 out_err:
1627         XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
1628         return -1;
1629 }
1630
1631 static int handle_map_delete(struct peerd *peer, struct peer_req *pr, 
1632                                 struct map *map, int err)
1633 {
1634         struct mapperd *mapper = __get_mapperd(peer);
1635         struct mapper_io *mio = __get_mapper_io(pr);
1636         xqindex idx;
1637         int r;
1638         (void) mio;
1639         map->flags &= ~MF_MAP_DELETING;
1640         if (err) {
1641                 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1642                 //dispatch all pending
1643                 while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1644                         struct peer_req * preq = (struct peer_req *) idx;
1645                         dispatch(peer, preq, preq->req, internal);
1646                 }
1647         } else {
1648                 map->flags |= MF_MAP_DESTROYED;
1649                 //delete all objects
1650                 XSEGLOG2(&lc, I, "Map %s map block deleted. Deleting objects", map->volume);
1651                 mio->delobj = 0;
1652                 r = delete_next_object(peer, pr, map);
1653                 if (r != MF_PENDING){
1654                         /* if there is no next object to delete, remove the map block
1655                          * from memory
1656                          */
1657                         //assert map flags OK
1658                         map->flags |= MF_MAP_DESTROYED;
1659                         XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1660                         //make all pending requests on map to fail
1661                         uint64_t qsize = xq_count(&map->pending);
1662                         while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
1663                                 qsize--;
1664                                 struct peer_req * preq = (struct peer_req *) idx;
1665                                 dispatch(peer, preq, preq->req, internal);
1666                         }
1667                         //free map resources;
1668                         remove_map(mapper, map);
1669                         struct map_node *mn = find_object(map, 0);
1670                         if (mn)
1671                                 free(mn);
1672                         xq_free(&map->pending);
1673                         free(map);
1674                 }
1675         }
1676         return 0;
1677 }
1678
1679 static int handle_delete(struct peerd *peer, struct peer_req *pr, 
1680                                 struct xseg_request *req)
1681 {
1682         struct mapperd *mapper = __get_mapperd(peer);
1683         struct mapper_io *mio = __get_mapper_io(pr);
1684         struct map_node *mn;
1685         struct map *map;
1686         int err = 0;
1687         if (req->state & XS_FAILED && !(req->state &XS_SERVED)) 
1688                 err = 1;
1689         
1690         mn = __get_copyup_node(mio, req);
1691         __set_copyup_node(mio, req, NULL);
1692         char *target = xseg_get_target(peer->xseg, req);
1693         if (!mn) {
1694                 //map block delete
1695                 map = find_map(mapper, target, req->targetlen);
1696                 if (!map) {
1697                         xseg_put_request(peer->xseg, req, pr->portno);
1698                         return -1;
1699                 }
1700                 handle_map_delete(peer, pr, map, err);
1701         } else {
1702                 //object delete
1703                 map = mn->map;
1704                 if (!map) {
1705                         xseg_put_request(peer->xseg, req, pr->portno);
1706                         return -1;
1707                 }
1708                 handle_object_delete(peer, pr, mn, err);
1709         }
1710         xseg_put_request(peer->xseg, req, pr->portno);
1711         return 0;
1712 }
1713
1714 static int handle_destroy(struct peerd *peer, struct peer_req *pr, 
1715                                 struct xseg_request *req)
1716 {
1717         struct mapperd *mapper = __get_mapperd(peer);
1718         struct mapper_io *mio = __get_mapper_io(pr);
1719         (void) mapper;
1720         int r;
1721         char buf[XSEG_MAX_TARGETLEN+1];
1722         char *target = xseg_get_target(peer->xseg, pr->req);
1723
1724         strncpy(buf, target, pr->req->targetlen);
1725         buf[req->targetlen] = 0;
1726
1727         XSEGLOG2(&lc, D, "Handle destroy pr: %lx, pr->req: %lx, req: %lx",
1728                         (unsigned long) pr, (unsigned long) pr->req,
1729                         (unsigned long) req);
1730         XSEGLOG2(&lc, D, "target: %s (%u)", buf, strlen(buf));
1731         if (pr->req != req && req->op == X_DELETE) {
1732                 //assert mio->state == DELETING
1733                 r = handle_delete(peer, pr, req);
1734                 if (r < 0) {
1735                         XSEGLOG2(&lc, E, "Handle delete returned error");
1736                         fail(peer, pr);
1737                         return -1;
1738                 } else {
1739                         return 0;
1740                 }
1741         }
1742
1743         struct map *map;
1744         r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1745         if (r < 0) {
1746                 fail(peer, pr);
1747                 return -1;
1748         }
1749         else if (r == MF_PENDING)
1750                 return 0;
1751         if (map->flags & MF_MAP_DESTROYED) {
1752                 if (mio->state == DELETING){
1753                         XSEGLOG2(&lc, I, "Map %s destroyed", map->volume);
1754                         complete(peer, pr);
1755                 }
1756                 else{
1757                         XSEGLOG2(&lc, I, "Map %s already destroyed", map->volume);
1758                         fail(peer, pr);
1759                 }
1760                 return 0;
1761         }
1762         if (mio->state == DELETING) {
1763                 //continue deleting map objects;
1764                 r = delete_next_object(peer ,pr, map);
1765                 if (r != MF_PENDING){
1766                         complete(peer, pr);
1767                 }
1768                 return 0;
1769         }
1770         //delete map block
1771         r = delete_map(peer, pr, map);
1772         if (r < 0) {
1773                 XSEGLOG2(&lc, E, "Map delete for map %s returned error", map->volume);
1774                 fail(peer, pr);
1775                 return -1;
1776         } else if (r == MF_PENDING) {
1777                 XSEGLOG2(&lc, I, "Map %s delete pending", map->volume);
1778                 __xq_append_tail(&map->pending, (xqindex) pr);
1779                 mio->state = DELETING;
1780                 return 0;
1781         }
1782         //unreachable
1783         XSEGLOG2(&lc, E, "Destroy unreachable");
1784         fail(peer, pr);
1785         return 0;
1786 }
1787
1788 static int handle_dropcache(struct peerd *peer, struct peer_req *pr, 
1789                                 struct xseg_request *req)
1790 {
1791         struct mapperd *mapper = __get_mapperd(peer);
1792         struct mapper_io *mio = __get_mapper_io(pr);
1793         (void) mapper;
1794         (void) mio;
1795         char *target = xseg_get_target(peer->xseg, pr->req);
1796         if (!target) {
1797                 fail(peer, pr);
1798                 return 0;
1799         }
1800
1801         struct map *map = find_map(mapper, target, pr->req->targetlen);
1802         if (!map){
1803                 complete(peer, pr);
1804                 return 0;
1805         } else if (map->flags & MF_MAP_DESTROYED) {
1806                 complete(peer, pr);
1807                 return 0;
1808         } else if (map->flags & MF_MAP_NOT_READY && mio->state != DROPPING_CACHE) {
1809                 __xq_append_tail(&map->pending, (xqindex) pr);
1810                 return 0;
1811         }
1812
1813         if (mio->state != DROPPING_CACHE) {
1814                 /* block all future operations on the map */
1815                 map->flags |= MF_MAP_DROPPING_CACHE;
1816                 mio->dcobj = 0;
1817                 mio->state = DROPPING_CACHE;
1818                 XSEGLOG2(&lc, I, "Map %s start dropping cache", map->volume);
1819         } else {
1820                 XSEGLOG2(&lc, I, "Map %s continue dropping cache", map->volume);
1821         }
1822
1823         struct map_node *mn; 
1824         uint64_t i;
1825         for (i = mio->dcobj; i < calc_map_obj(map); i++) {
1826                 mn = find_object(map, i);
1827                 if (!mn)
1828                         continue;
1829                 mio->dcobj = i;
1830                 if (xq_count(&mn->pending) != 0){
1831                         XSEGLOG2(&lc, D, "Map %s pending dropping cache for obj idx: %llu", 
1832                                 map->volume, (unsigned long long) mn->objectidx);
1833                         __xq_append_tail(&mn->pending, (xqindex) pr);
1834                         return 0;
1835                 }
1836                 xq_free(&mn->pending);
1837                 XSEGLOG2(&lc, D, "Map %s dropped cache for obj idx: %llu", 
1838                                 map->volume, (unsigned long long) mn->objectidx);
1839         }
1840         remove_map(mapper, map);
1841         //dispatch pending
1842         uint64_t qsize = xq_count(&map->pending);
1843         while(qsize > 0 && (i = __xq_pop_head(&map->pending)) != Noneidx){
1844                 qsize--;
1845                 struct peer_req * preq = (struct peer_req *) i;
1846                 dispatch(peer, preq, preq->req, internal);
1847         }
1848         XSEGLOG2(&lc, I, "Map %s droped cache", map->volume);
1849         
1850         //free map resources;
1851         mn = find_object(map, 0);
1852         if (mn)
1853                 free(mn);
1854         xq_free(&map->pending);
1855         free(map);
1856
1857         complete(peer, pr);
1858
1859         return 0;
1860 }
1861
1862 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1863                 enum dispatch_reason reason)
1864 {
1865         struct mapperd *mapper = __get_mapperd(peer);
1866         (void) mapper;
1867         struct mapper_io *mio = __get_mapper_io(pr);
1868         (void) mio;
1869
1870         if (reason == accept)
1871                 mio->state = ACCEPTED;
1872         
1873         if (req->op == X_READ) {
1874                 /* catch map reads requests here */
1875                 handle_mapread(peer, pr, req);
1876                 return 0;
1877         }
1878
1879         switch (pr->req->op) {
1880                 /* primary xseg operations of mapper */
1881                 case X_CLONE: handle_clone(peer, pr, req); break;
1882                 case X_MAPR: handle_mapr(peer, pr, req); break;
1883                 case X_MAPW: handle_mapw(peer, pr, req); break;
1884 //              case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1885                 case X_INFO: handle_info(peer, pr, req); break;
1886                 case X_DELETE: handle_destroy(peer, pr, req); break;
1887                 case X_CLOSE: handle_dropcache(peer, pr, req); break;
1888                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1889         }
1890         return 0;
1891 }
1892
1893 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1894 {
1895         int i;
1896         unsigned char buf[SHA256_DIGEST_SIZE];
1897         char *zero;
1898
1899         gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1900
1901         /* Version check should be the very first call because it
1902           makes sure that important subsystems are intialized. */
1903         gcry_check_version (NULL);
1904      
1905         /* Disable secure memory.  */
1906         gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1907      
1908         /* Tell Libgcrypt that initialization has completed. */
1909         gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1910
1911         /* calculate out magic sha hash value */
1912         gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1913
1914         /* calculate zero block */
1915         //FIXME check hash value
1916         zero = malloc(block_size);
1917         memset(zero, 0, block_size);
1918         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1919         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1920                 sprintf(zero_block + 2*i, "%02x", buf[i]);
1921         printf("%s \n", zero_block);
1922         free(zero);
1923
1924         //FIXME error checks
1925         struct mapperd *mapper = malloc(sizeof(struct mapperd));
1926         mapper->hashmaps = xhash_new(3, STRING);
1927         peer->priv = mapper;
1928         
1929         for (i = 0; i < peer->nr_ops; i++) {
1930                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1931                 mio->copyups_nodes = xhash_new(3, INTEGER);
1932                 mio->copyups = 0;
1933                 mio->err = 0;
1934                 peer->peer_reqs[i].priv = mio;
1935         }
1936
1937         for (i = 0; i < argc; i++) {
1938                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1939                         mapper->bportno = atoi(argv[i+1]);
1940                         i += 1;
1941                         continue;
1942                 }
1943                 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1944                         mapper->mbportno = atoi(argv[i+1]);
1945                         i += 1;
1946                         continue;
1947                 }
1948                 /* enforce only one thread */
1949                 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1950                         int t = atoi(argv[i+1]);
1951                         if (t != 1) {
1952                                 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1953                                 return -1;
1954                         }
1955                         i += 1;
1956                         continue;
1957                 }
1958         }
1959
1960         const struct sched_param param = { .sched_priority = 99 };
1961         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
1962
1963
1964 //      test_map(peer);
1965
1966         return 0;
1967 }
1968
1969 void print_obj(struct map_node *mn)
1970 {
1971         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
1972                         (unsigned long long) mn->objectidx, mn->object, 
1973                         (unsigned int) mn->objectlen, 
1974                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
1975 }
1976
1977 void print_map(struct map *m)
1978 {
1979         uint64_t nr_objs = m->size/block_size;
1980         if (m->size % block_size)
1981                 nr_objs++;
1982         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n", 
1983                         m->volume, m->volumelen, 
1984                         (unsigned long long) m->size, 
1985                         (unsigned long long) nr_objs);
1986         uint64_t i;
1987         struct map_node *mn;
1988         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
1989                 return;
1990         for (i = 0; i < nr_objs; i++) {
1991                 mn = find_object(m, i);
1992                 if (!mn){
1993                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
1994                         continue;
1995                 }
1996                 print_obj(mn);
1997         }
1998 }
1999
2000 /*
2001 void test_map(struct peerd *peer)
2002 {
2003         int i,j, ret;
2004         //struct sha256_ctx sha256ctx;
2005         unsigned char buf[SHA256_DIGEST_SIZE];
2006         char buf_new[XSEG_MAX_TARGETLEN + 20];
2007         struct map *m = malloc(sizeof(struct map));
2008         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2009         m->volume[XSEG_MAX_TARGETLEN] = 0;
2010         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2011         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2012         m->volumelen = XSEG_MAX_TARGETLEN;
2013         m->size = 100*block_size;
2014         m->objects = xhash_new(3, INTEGER);
2015         struct map_node *map_node = calloc(100, sizeof(struct map_node));
2016         for (i = 0; i < 100; i++) {
2017                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2018                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2019                 
2020                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2021                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2022                 }
2023                 map_node[i].objectidx = i;
2024                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2025                 map_node[i].flags = MF_OBJECT_EXIST;
2026                 ret = insert_object(m, &map_node[i]);
2027         }
2028
2029         char *data = malloc(block_size);
2030         mapheader_to_map(m, data);
2031         uint64_t pos = mapheader_size;
2032
2033         for (i = 0; i < 100; i++) {
2034                 map_node = find_object(m, i);
2035                 if (!map_node){
2036                         printf("no object node %d \n", i);
2037                         exit(1);
2038                 }
2039                 object_to_map(data+pos, map_node);
2040                 pos += objectsize_in_map;
2041         }
2042 //      print_map(m);
2043
2044         struct map *m2 = malloc(sizeof(struct map));
2045         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2046         m->volume[XSEG_MAX_TARGETLEN] = 0;
2047         m->volumelen = XSEG_MAX_TARGETLEN;
2048
2049         m2->objects = xhash_new(3, INTEGER);
2050         ret = read_map(peer, m2, data);
2051 //      print_map(m2);
2052
2053         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2054         ssize_t r, sum = 0;
2055         while (sum < block_size) {
2056                 r = write(fd, data + sum, block_size -sum);
2057                 if (r < 0){
2058                         perror("write");
2059                         printf("write error\n");
2060                         exit(1);
2061                 } 
2062                 sum += r;
2063         }
2064         close(fd);
2065         map_node = find_object(m, 0);
2066         free(map_node);
2067         free(m);
2068 }
2069 */