2042e0c24e9ed443927e8099c9be5d932d2191a3
[archipelago] / xseg / peers / user / mt-mapperd.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <pthread.h>
5 #include <xseg/xseg.h>
6 #include <mpeer.h>
7 #include <time.h>
8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <gcrypt.h>
14 #include <errno.h>
15
16 GCRY_THREAD_OPTION_PTHREAD_IMPL;
17
18 #define MF_PENDING 1
19
20 #define SHA256_DIGEST_SIZE 32
21 /* hex representation of sha256 value takes up double the sha256 size */
22 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
23
24 #define block_size (1<<22) //FIXME this should be defined here?
25 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
26 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value  + volume size */
27
28 #define MF_OBJECT_EXIST         (1 << 0)
29 #define MF_OBJECT_COPYING       (1 << 1)
30 #define MF_OBJECT_WRITING       (1 << 2)
31 #define MF_OBJECT_DELETING      (1 << 3)
32
33 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
34 extern struct log_ctx lc;
35
36 char *magic_string = "This a magic string. Please hash me";
37 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
38 char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
39
40 //internal mapper states
41 enum mapper_state {
42         ACCEPTED = 0,
43         WRITING = 1,
44         COPYING = 2,
45         DELETING = 3,
46         DROPPING_CACHE = 4
47 };
48
49 struct map_node {
50         uint32_t flags;
51         uint32_t objectidx;
52         uint32_t objectlen;
53         char object[XSEG_MAX_TARGETLEN + 1];    /* NULL terminated string */
54         struct xq pending;                      /* pending peer_reqs on this object */
55         struct map *map;
56 };
57
58 #define MF_MAP_LOADING          (1 << 0)
59 #define MF_MAP_DESTROYED        (1 << 1)
60 #define MF_MAP_WRITING          (1 << 2)
61 #define MF_MAP_DELETING         (1 << 3)
62 #define MF_MAP_DROPPING_CACHE   (1 << 4)
63
64 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
65                                         MF_MAP_DROPPING_CACHE)
66
67 struct map {
68         uint32_t flags;
69         uint64_t size;
70         uint32_t volumelen;
71         char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
72         xhash_t *objects;       /* obj_index --> map_node */
73         struct xq pending;      /* pending peer_reqs on this map */
74 };
75
76 struct mapperd {
77         xport bportno;          /* blocker that accesses data */
78         xport mbportno;         /* blocker that accesses maps */
79         xhash_t *hashmaps; // hash_function(target) --> struct map
80 };
81
82 struct mapper_io {
83         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
84         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
85         struct map_node *copyup_node;
86         int err;                        /* error flag */
87         uint64_t delobj;
88         uint64_t dcobj;
89         enum mapper_state state;
90 };
91
92 static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req);
93 void print_map(struct map *m);
94
95 /*
96  * Helper functions
97  */
98
99 static inline struct mapperd * __get_mapperd(struct peerd *peer)
100 {
101         return (struct mapperd *) peer->priv;
102 }
103
104 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
105 {
106         return (struct mapper_io *) pr->priv;
107 }
108
109 static inline uint64_t calc_map_obj(struct map *map)
110 {
111         uint64_t nr_objs = map->size / block_size;
112         if (map->size % block_size)
113                 nr_objs++;
114         return nr_objs;
115 }
116
117 static uint32_t calc_nr_obj(struct xseg_request *req)
118 {
119         unsigned int r = 1;
120         uint64_t rem_size = req->size;
121         uint64_t obj_offset = req->offset & (block_size -1); //modulo
122         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
123         rem_size -= obj_size;
124         while (rem_size > 0) {
125                 obj_size = (rem_size > block_size) ? block_size : rem_size;
126                 rem_size -= obj_size;
127                 r++;
128         }
129
130         return r;
131 }
132
133 /*
134  * Maps handling functions
135  */
136
137 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
138 {
139         int r;
140         struct map *m = NULL;
141         char buf[XSEG_MAX_TARGETLEN+1];
142         //assert targetlen <= XSEG_MAX_TARGETLEN
143         strncpy(buf, target, targetlen);
144         buf[targetlen] = 0;
145         XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
146         r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
147         if (r < 0)
148                 return NULL;
149         return m;
150 }
151
152
153 static int insert_map(struct mapperd *mapper, struct map *map)
154 {
155         int r = -1;
156         
157         if (find_map(mapper, map->volume, map->volumelen)){
158                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
159                 goto out;
160         }
161
162         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
163                         map->volume, strlen(map->volume), (unsigned long) map);
164         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
165         while (r == -XHASH_ERESIZE) {
166                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
167                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
168                 if (!new_hashmap){
169                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
170                                         (unsigned long long) shift);
171                         goto out;
172                 }
173                 mapper->hashmaps = new_hashmap;
174                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
175         }
176 out:
177         return r;
178 }
179
180 static int remove_map(struct mapperd *mapper, struct map *map)
181 {
182         int r = -1;
183         
184         //assert no pending pr on map
185         
186         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
187         while (r == -XHASH_ERESIZE) {
188                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
189                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
190                 if (!new_hashmap){
191                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
192                                         (unsigned long long) shift);
193                         goto out;
194                 }
195                 mapper->hashmaps = new_hashmap;
196                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
197         }
198 out:
199         return r;
200 }
201
202 /* async map load */
203 static int load_map(struct peerd *peer, struct peer_req *pr, char *target, 
204                         uint32_t targetlen)
205 {
206         int r;
207         xport p;
208         struct xseg_request *req;
209         struct mapperd *mapper = __get_mapperd(peer);
210         void *dummy;
211
212         struct map *m = find_map(mapper, target, targetlen);
213         if (!m) {
214                 m = malloc(sizeof(struct map));
215                 if (!m){
216                         XSEGLOG2(&lc, E, "Cannot allocate map ");
217                         goto out_err;
218                 }
219                 m->size = -1;
220                 strncpy(m->volume, target, targetlen);
221                 m->volume[targetlen] = 0;
222                 m->volumelen = targetlen;
223                 m->flags = MF_MAP_LOADING;
224                 xqindex *qidx = xq_alloc_empty(&m->pending, peer->nr_ops);
225                 if (!qidx) {
226                         XSEGLOG2(&lc, E, "Cannot allocate pending queue for map %s",
227                                         m->volume);
228                         goto out_map;
229                 }
230                 m->objects = xhash_new(3, INTEGER); 
231                 if (!m->objects){
232                         XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
233                                         m->volume);
234                         goto out_q;
235                 }
236                 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME err check
237         } else {
238                 goto map_exists;
239         }
240
241         r = insert_map(mapper, m);
242         if (r < 0)  
243                 goto out_hash;
244         
245
246         req = xseg_get_request(peer->xseg, peer->portno, mapper->mbportno, X_ALLOC);
247         if (!req){
248                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
249                                 m->volume);
250                 goto out_fail;
251         }
252
253         r = xseg_prep_request(peer->xseg, req, targetlen, block_size);
254         if (r < 0){
255                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
256                                 m->volume);
257                 goto out_put;
258         }
259
260         char *reqtarget = xseg_get_target(peer->xseg, req);
261         if (!reqtarget)
262                 goto out_put;
263         strncpy(reqtarget, target, req->targetlen);
264         req->op = X_READ;
265         req->size = block_size;
266         req->offset = 0;
267         r = xseg_set_req_data(peer->xseg, req, pr);
268         if (r < 0){
269                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
270                                 m->volume);
271                 goto out_put;
272         }
273         p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
274         if (p == NoPort){ 
275                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
276                                 m->volume);
277                 goto out_unset;
278         }
279         r = xseg_signal(peer->xseg, p);
280         
281         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
282         return 0;
283
284 out_unset:
285         xseg_get_req_data(peer->xseg, req, &dummy);
286 out_put:
287         xseg_put_request(peer->xseg, req, peer->portno);
288
289 out_fail:
290         remove_map(mapper, m);
291         xqindex idx;
292         while((idx = __xq_pop_head(&m->pending)) != Noneidx) {
293                 fail(peer, (struct peer_req *) idx);
294         }
295
296 out_hash:
297         xhash_free(m->objects);
298 out_q:
299         xq_free(&m->pending);
300 out_map:
301         XSEGLOG2(&lc, E, "failed to load map %s", m->volume);
302         free(m);
303 out_err:
304         return -1;
305
306 map_exists:
307         //assert map loading when this is reached
308         if (m->flags & MF_MAP_LOADING) {
309                 XSEGLOG2(&lc, I, "Map %s already exists and loading. "
310                                 "Adding to pending queue", m->volume);
311                 __xq_append_tail(&m->pending, (xqindex) pr); //FIXME errcheck
312         }
313         else {
314                 XSEGLOG2(&lc, I, "Map %s already exists and loaded. Dispatching.", m->volume);
315                 my_dispatch(peer, pr, pr->req);
316         }
317         return 0;
318 }
319
320
321 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
322                                 char *target, uint32_t targetlen, struct map **m)
323 {
324         struct mapperd *mapper = __get_mapperd(peer);
325         int r;
326         *m = find_map(mapper, target, targetlen);
327         if (*m) {
328                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
329                 if ((*m)->flags & MF_MAP_NOT_READY) {
330                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
331                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
332                         return MF_PENDING;
333                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
334                 //      return -1;
335                 // 
336                 }else {
337                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
338                         return 0;
339                 }
340         }
341         r = load_map(peer, pr, target, targetlen);
342         if (r < 0)
343                 return -1; //error
344         return MF_PENDING;      
345 }
346
347 /*
348  * Object handling functions
349  */
350
351 struct map_node *find_object(struct map *map, uint64_t obj_index)
352 {
353         struct map_node *mn;
354         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
355         if (r < 0)
356                 return NULL;
357         return mn;
358 }
359
360 static int insert_object(struct map *map, struct map_node *mn)
361 {
362         //FIXME no find object first
363         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
364         if (r == -XHASH_ERESIZE) {
365                 unsigned long shift = xhash_grow_size_shift(map->objects);
366                 map->objects = xhash_resize(map->objects, shift, NULL);
367                 if (!map->objects)
368                         return -1;
369                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
370         }
371         return r;
372 }
373
374
375 /*
376  * map read/write functions 
377  */
378 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
379 {
380         int i;
381         //hexlify sha256 value
382         for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
383                 sprintf(mn->object+2*i, "%02x", buf[i]);
384         }
385
386         mn->object[SHA256_DIGEST_SIZE * 2] = 0;
387         mn->objectlen = SHA256_DIGEST_SIZE * 2;
388         mn->flags = MF_OBJECT_EXIST;
389 }
390
391 static inline void map_to_object(struct map_node *mn, char *buf)
392 {
393         char c = buf[0];
394         mn->flags = 0;
395         if (c)
396                 mn->flags |= MF_OBJECT_EXIST;
397         memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
398         mn->object[XSEG_MAX_TARGETLEN] = 0;
399         mn->objectlen = strlen(mn->object);
400 }
401
402 static inline void object_to_map(char* buf, struct map_node *mn)
403 {
404         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
405         memcpy(buf+1, mn->object, mn->objectlen);
406         memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
407 }
408
409 static inline void mapheader_to_map(struct map *m, char *buf)
410 {
411         uint64_t pos = 0;
412         memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
413         pos += SHA256_DIGEST_SIZE;
414         memcpy(buf + pos, &m->size, sizeof(m->size));
415         pos += sizeof(m->size);
416 }
417
418
419 static int object_write(struct peerd *peer, struct peer_req *pr, 
420                                 struct map *map, struct map_node *mn)
421 {
422         void *dummy;
423         struct mapperd *mapper = __get_mapperd(peer);
424         struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
425                                                         mapper->mbportno, X_ALLOC);
426         if (!req){
427                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
428                                 "(Map: %s [%llu]",
429                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
430                 goto out_err;
431         }
432         int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
433         if (r < 0){
434                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
435                                 "(Map: %s [%llu]",
436                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
437                 goto out_put;
438         }
439         char *target = xseg_get_target(peer->xseg, req);
440         strncpy(target, map->volume, req->targetlen);
441         req->size = objectsize_in_map;
442         req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
443         req->op = X_WRITE;
444         char *data = xseg_get_data(peer->xseg, req);
445         object_to_map(data, mn);
446
447         r = xseg_set_req_data(peer->xseg, req, pr);
448         if (r < 0){
449                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
450                                 "(Map: %s [%llu]",
451                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
452                 goto out_put;
453         }
454         xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
455         if (p == NoPort){
456                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
457                                 "(Map: %s [%llu]",
458                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
459                 goto out_unset;
460         }
461         r = xseg_signal(peer->xseg, p);
462         if (r < 0)
463                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
464
465         XSEGLOG2(&lc, I, "Writing object %s \n\t"
466                         "Map: %s [%llu]",
467                         mn->object, map->volume, (unsigned long long) mn->objectidx);
468
469         return MF_PENDING;
470
471 out_unset:
472         xseg_get_req_data(peer->xseg, req, &dummy);
473 out_put:
474         xseg_put_request(peer->xseg, req, peer->portno);
475 out_err:
476         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
477                         "(Map: %s [%llu]",
478                         mn->object, map->volume, (unsigned long long) mn->objectidx);
479         return -1;
480 }
481
482 static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
483 {
484         void *dummy;
485         struct mapperd *mapper = __get_mapperd(peer);
486         struct map_node *mn;
487         uint64_t i, pos, max_objidx = calc_map_obj(map);
488         struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
489                                                         mapper->mbportno, X_ALLOC);
490         if (!req){
491                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
492                 goto out_err;
493         }
494         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 
495                                         mapheader_size + max_objidx * objectsize_in_map);
496         if (r < 0){
497                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
498                 goto out_put;
499         }
500         char *target = xseg_get_target(peer->xseg, req);
501         strncpy(target, map->volume, req->targetlen);
502         char *data = xseg_get_data(peer->xseg, req);
503         mapheader_to_map(map, data);
504         pos = mapheader_size;
505         req->op = X_WRITE;
506         req->size = req->datalen;
507         req->offset = 0;
508
509         if (map->size % block_size)
510                 max_objidx++;
511         for (i = 0; i < max_objidx; i++) {
512                 mn = find_object(map, i);
513                 if (!mn){
514                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
515                                         (unsigned long long) i, map->volume);
516                         goto out_put;
517                 }
518                 object_to_map(data+pos, mn);
519                 pos += objectsize_in_map;
520         }
521         r = xseg_set_req_data(peer->xseg, req, pr);
522         if (r < 0){
523                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
524                                 map->volume);
525                 goto out_put;
526         }
527         xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
528         if (p == NoPort){
529                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
530                                 map->volume);
531                 goto out_unset;
532         }
533         r = xseg_signal(peer->xseg, p);
534         if (r < 0)
535                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
536
537         map->flags |= MF_MAP_WRITING;
538         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
539         return MF_PENDING;
540
541 out_unset:
542         xseg_get_req_data(peer->xseg, req, &dummy);
543 out_put:
544         xseg_put_request(peer->xseg, req, peer->portno);
545 out_err:
546         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
547         return -1;
548 }
549
550 static int read_map (struct peerd *peer, struct map *map, char *buf)
551 {
552         char nulls[SHA256_DIGEST_SIZE];
553         memset(nulls, 0, SHA256_DIGEST_SIZE);
554
555         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
556         if (r) {
557                 //read error;
558                 return -1;
559         }
560         //type 1, our type, type 0 pithos map
561         int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
562         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
563         uint64_t pos;
564         uint64_t i, nr_objs;
565         struct map_node *map_node;
566         if (type) {
567                 pos = SHA256_DIGEST_SIZE;
568                 map->size = *(uint64_t *) (buf + pos);
569                 pos += sizeof(uint64_t);
570                 nr_objs = map->size / block_size;
571                 if (map->size % block_size)
572                         nr_objs++;
573                 map_node = calloc(nr_objs, sizeof(struct map_node));
574                 if (!map_node)
575                         return -1;
576
577                 for (i = 0; i < nr_objs; i++) {
578                         map_node[i].map = map;
579                         map_node[i].objectidx = i;
580                         xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
581                         (void) qidx;
582                         map_to_object(&map_node[i], buf + pos);
583                         pos += objectsize_in_map;
584                         r = insert_object(map, &map_node[i]); //FIXME error check
585                 }
586         } else {
587                 pos = 0;
588                 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
589                 map_node = calloc(max_nr_objs, sizeof(struct map_node));
590                 if (!map_node)
591                         return -1;
592                 for (i = 0; i < max_nr_objs; i++) {
593                         if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
594                                 break;
595                         map_node[i].objectidx = i;
596                         map_node[i].map = map;
597                         xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
598                         (void) qidx;
599                         pithosmap_to_object(&map_node[i], buf + pos);
600                         pos += SHA256_DIGEST_SIZE; 
601                         r = insert_object(map, &map_node[i]); //FIXME error check
602                 }
603                 map->size = i * block_size; 
604         }
605         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
606         return 0;
607
608         //FIXME cleanup on error
609 }
610
611 /*
612  * copy up functions
613  */
614
615 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
616 {
617         int r = 0;
618         /*
619         if (mn){
620                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
621                 if (r == -XHASH_ERESIZE) {
622                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
623                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
624                         if (!new_hashmap)
625                                 goto out;
626                         mio->copyups_nodes = new_hashmap;
627                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
628                 }
629         }
630         else {
631                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
632                 if (r == -XHASH_ERESIZE) {
633                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
634                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
635                         if (!new_hashmap)
636                                 goto out;
637                         mio->copyups_nodes = new_hashmap;
638                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
639                 }
640         }
641 out:
642         */
643         mio->copyup_node = mn;
644         return r;
645 }
646
647 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
648 {
649         /*
650         struct map_node *mn;
651         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
652         if (r < 0)
653                 return NULL;
654         return mn;
655         */
656         return mio->copyup_node;
657 }
658
659 static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
660 {
661         struct mapperd *mapper = __get_mapperd(peer);
662         struct mapper_io *mio = __get_mapper_io(pr);
663         struct map *map = mn->map;
664         void *dummy;
665         int r = -1, i;
666         xport p;
667
668         //struct sha256_ctx sha256ctx;
669         uint32_t newtargetlen;
670         char new_target[XSEG_MAX_TARGETLEN + 1]; 
671         unsigned char buf[SHA256_DIGEST_SIZE];  //assert sha256_digest_size(32) <= MAXTARGETLEN 
672         char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
673         strncpy(new_object, map->volume, map->volumelen);
674         sprintf(new_object + map->volumelen, "%u", mn->objectidx); //sprintf adds null termination
675         new_object[XSEG_MAX_TARGETLEN + 19] = 0;
676
677         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
678         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
679                 sprintf (new_target + 2*i, "%02x", buf[i]);
680         newtargetlen = SHA256_DIGEST_SIZE  * 2;
681
682         if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE)) 
683                 goto copyup_zeroblock;
684
685         struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
686                                                         mapper->bportno, X_ALLOC);
687         if (!req)
688                 goto out_err;
689         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
690                                 sizeof(struct xseg_request_copy));
691         if (r < 0)
692                 goto out_put;
693
694         char *target = xseg_get_target(peer->xseg, req);
695         strncpy(target, new_target, req->targetlen);
696
697         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
698         strncpy(xcopy->target, mn->object, mn->objectlen);
699         xcopy->targetlen = mn->objectlen;
700
701         req->offset = 0;
702         req->size = block_size;
703         req->op = X_COPY;
704         r = xseg_set_req_data(peer->xseg, req, pr);
705         if (r<0)
706                 goto out_put;
707         r = __set_copyup_node(mio, req, mn);
708         p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
709         if (p == NoPort) {
710                 goto out_unset;
711         }
712         xseg_signal(peer->xseg, p);
713         mio->copyups++;
714
715         mn->flags |= MF_OBJECT_COPYING;
716         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
717         return 0;
718
719 out_unset:
720         r = __set_copyup_node(mio, req, NULL);
721         xseg_get_req_data(peer->xseg, req, &dummy);
722 out_put:
723         xseg_put_request(peer->xseg, req, peer->portno);
724 out_err:
725         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
726         return -1;
727
728 copyup_zeroblock:
729         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
730                         "Proceeding in writing the new object in map");
731         /* construct a tmp map_node for writing purposes */
732         struct map_node newmn = *mn;
733         newmn.flags = MF_OBJECT_EXIST;
734         strncpy(newmn.object, new_target, newtargetlen);
735         newmn.object[newtargetlen] = 0;
736         newmn.objectlen = newtargetlen;
737         newmn.objectidx = mn->objectidx; 
738         r = __set_copyup_node(mio, req, mn);
739         r = object_write(peer, pr, map, &newmn);
740         if (r != MF_PENDING){
741                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
742                                 "\n\t of map %s [%llu]",
743                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
744                 return -1;
745         }
746         mn->flags |= MF_OBJECT_WRITING;
747         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
748         return 0;
749 }
750
751 /*
752  * request handling functions
753  */
754
755 static int handle_mapread(struct peerd *peer, struct peer_req *pr, 
756                                 struct xseg_request *req)
757 {
758         int r;
759         xqindex idx;
760         char buf[XSEG_MAX_TARGETLEN];
761         struct mapperd *mapper = __get_mapperd(peer);
762         //assert req->op = X_READ;
763         char *target = xseg_get_target(peer->xseg, req);
764         struct map *map = find_map(mapper, target, req->targetlen);
765         if (!map)
766                 goto out_err;
767         //assert map->flags & MF_MAP_LOADING
768
769         if (req->state & XS_FAILED)
770                 goto out_fail;
771
772         char *data = xseg_get_data(peer->xseg, req);
773         r = read_map(peer, map, data);
774         if (r < 0)
775                 goto out_fail;
776         
777         xseg_put_request(peer->xseg, req, peer->portno);
778         map->flags &= ~MF_MAP_LOADING;
779         XSEGLOG2(&lc, I, "Map %s loaded. Dispatching pending", map->volume);
780         uint64_t qsize = xq_count(&map->pending);
781         while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
782                 qsize--;
783                 struct peer_req *preq = (struct peer_req *) idx;
784                 my_dispatch(peer, preq, preq->req);
785         }
786         return 0;
787
788 out_fail:
789         XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
790         xseg_put_request(peer->xseg, req, peer->portno);
791         map->flags &= ~MF_MAP_LOADING;
792         while((idx = __xq_pop_head(&map->pending)) != Noneidx){
793                 struct peer_req *preq = (struct peer_req *) idx;
794                 fail(peer, preq);
795         }
796         remove_map(mapper, map);
797         //FIXME not freeing up all objects + object hash
798         free(map);
799         return 0;
800
801 out_err:
802         strncpy(buf, target, req->targetlen);
803         buf[req->targetlen] = 0;
804         XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
805         xseg_put_request(peer->xseg, req, peer->portno);
806         return -1;
807 }
808
809 static int handle_mapwrite(struct peerd *peer, struct peer_req *pr,
810                                 struct xseg_request *req)
811 {
812         xqindex idx;
813         char buf[XSEG_MAX_TARGETLEN];
814         struct mapperd *mapper = __get_mapperd(peer);
815         //assert req->op = X_WRITE;
816         char *target = xseg_get_target(peer->xseg, req);
817         struct map *map = find_map(mapper, target, req->targetlen);
818         if (!map) {
819                 fprintf(stderr, "couldn't find map\n");
820                 goto out_err;
821         }
822         //assert map->flags & MF_MAP_WRITING
823
824         if (req->state & XS_FAILED){
825                 fprintf(stderr, "write request failed\n");
826                 goto out_fail;
827         }
828         
829         xseg_put_request(peer->xseg, req, peer->portno);
830         map->flags &= ~MF_MAP_WRITING;
831         XSEGLOG2(&lc, I, "Map %s written. Dispatching pending", map->volume);
832         uint64_t qsize = xq_count(&map->pending);
833         while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
834                 qsize--;
835                 struct peer_req *preq = (struct peer_req *) idx;
836                 my_dispatch(peer, preq, preq->req);
837         }
838         return 0;
839
840
841 out_fail:
842         XSEGLOG2(&lc, E, "Map write for map %s failed", map->volume);
843         xseg_put_request(peer->xseg, req, peer->portno);
844         map->flags &= ~MF_MAP_WRITING;
845         while((idx = __xq_pop_head(&map->pending)) != Noneidx){
846                 struct peer_req *preq = (struct peer_req *) idx;
847                 fail(peer, preq);
848         }
849         remove_map(mapper, map);
850         //FIXME not freeing up all objects + object hash
851         free(map);
852         return 0;
853
854 out_err:
855         strncpy(buf, target, req->targetlen);
856         buf[req->targetlen] = 0;
857         XSEGLOG2(&lc, E, "Cannot find map for request target %s", buf);
858         xseg_put_request(peer->xseg, req, peer->portno);
859         return -1;
860 }
861
862 static int handle_clone(struct peerd *peer, struct peer_req *pr, 
863                                 struct xseg_request *req)
864 {
865         struct mapperd *mapper = __get_mapperd(peer);
866         struct mapper_io *mio = __get_mapper_io(pr);
867         (void) mio;
868         int r;
869         char buf[XSEG_MAX_TARGETLEN + 1];
870         char *target;
871
872         if (pr->req->op != X_CLONE) {
873                 //wtf??
874                 XSEGLOG2(&lc, E, "Unknown op %u", req->op);
875                 fail(peer, pr);
876                 return 0;
877         }
878
879         if (req->op == X_WRITE){
880                         //assert state = WRITING;
881                         r = handle_mapwrite(peer, pr ,req);
882                         if (r < 0){
883                                 XSEGLOG2(&lc, E, "handle mapwrite returned error");
884                                 fail(peer, pr);
885                         }
886                         return 0;
887         }
888
889         if (mio->state == WRITING) {
890                 target = xseg_get_target(peer->xseg, pr->req);
891                 strncpy(buf, target, req->targetlen);
892                 buf[req->targetlen] = 0;
893                 XSEGLOG2(&lc, I, "Completing clone request for map %s", buf);
894                 complete(peer, pr);
895                 return 0;
896         }
897
898         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
899         if (!xclone) {
900                 goto out_err;
901         }
902         struct map *map;
903         r = find_or_load_map(peer, pr, xclone->target, xclone->targetlen, &map);
904         if (r < 0){
905                 goto out_err;
906         }
907         else if (r == MF_PENDING)
908                 return 0;
909         
910         if (map->flags & MF_MAP_DESTROYED) {
911                 strncpy(buf, xclone->target, xclone->targetlen);
912                 buf[xclone->targetlen] = 0;
913                 XSEGLOG2(&lc, W, "Map %s destroyed", buf);
914                 target = xseg_get_target(peer->xseg, pr->req);
915                 strncpy(buf, target, req->targetlen);
916                 buf[req->targetlen] = 0;
917                 XSEGLOG2(&lc, W, "Cannont clone %s because base map destroyed", buf);
918                 fail(peer, pr);
919                 return 0;
920         }
921
922         struct map *clonemap = malloc(sizeof(struct map));
923         if (!clonemap) {
924                 goto out_err;
925         }
926         /*
927         FIXME check if clone map exists
928         find_or_load_map(peer, pr, target, req->targetlen, &clonemap)
929         ... (on destroyed what ??
930         if (clonemap) {
931                 target = xseg_get_target(peer->xseg, pr->req);
932                 strncpy(buf, target, req->targetlen);
933                 buf[req->targetlen] = 0;
934                 XSEGLOG2(&lc, W, "Map %s requested for clone exists", buf);
935                 fail(peer, pr);
936                 return 0;
937         }
938         */
939         //alloc and init struct map
940         clonemap->objects = xhash_new(3, INTEGER);
941         if (!clonemap->objects){
942                 goto out_err_clonemap;
943         }
944         xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops);
945         if (!qidx){
946                 goto out_err_objhash;
947         }
948         if (xclone->size < map->size) {
949                 target = xseg_get_target(peer->xseg, pr->req);
950                 strncpy(buf, target, req->targetlen);
951                 buf[req->targetlen] = 0;
952                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
953                                 "\n\t for requested clone %s",
954                                 (unsigned long long) xclone->size,
955                                 (unsigned long long) map->size, buf);
956                 goto out_err_q;
957         }
958         if (xclone->size == -1)
959                 clonemap->size = map->size;
960         else
961                 clonemap->size = xclone->size;
962         clonemap->flags = 0;
963         target = xseg_get_target(peer->xseg, pr->req);
964         strncpy(clonemap->volume, target, pr->req->targetlen);
965         clonemap->volumelen = pr->req->targetlen;
966         clonemap->volume[clonemap->volumelen] = 0; //NULL TERMINATE
967
968         //alloc and init map_nodes
969         unsigned long c = clonemap->size/block_size + 1;
970         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
971         if (!map_nodes){
972                 goto out_err_q;
973         }
974         int i;
975         for (i = 0; i < clonemap->size/block_size + 1; i++) {
976                 struct map_node *mn = find_object(map, i);
977                 if (mn) {
978                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
979                         map_nodes[i].objectlen = mn->objectlen;
980                 } else {
981                         strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2 ?
982                         map_nodes[i].objectlen = strlen(zero_block);
983                 }
984                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
985                 map_nodes[i].flags = 0;
986                 map_nodes[i].objectidx = i;
987                 map_nodes[i].map = clonemap;
988                 xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops);
989                 r = insert_object(clonemap, &map_nodes[i]);
990                 if (r < 0){
991                         goto out_free_all;
992                 }
993         }
994         //insert map
995         r = insert_map(mapper, clonemap);
996         if ( r < 0) {
997                 XSEGLOG2(&lc, E, "Cannot insert map %s", clonemap->volume);
998                 goto out_free_all;
999         }
1000         r = map_write(peer, pr, clonemap);
1001         if (r < 0){
1002                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1003                 goto out_remove;
1004         }
1005         else if (r == MF_PENDING) {
1006                 //maybe move this to map_write
1007                 XSEGLOG2(&lc, I, "Writing map %s", clonemap->volume);
1008                 __xq_append_tail(&clonemap->pending, (xqindex) pr);
1009                 mio->state = WRITING;
1010                 return 0;
1011         } else {
1012                 //unknown state
1013                 XSEGLOG2(&lc, I, "Map write for map %s returned unknown value", clonemap->volume);
1014                 goto out_remove;
1015         }
1016         
1017         return 0;
1018
1019 out_remove:
1020         remove_map(mapper, clonemap);
1021 out_free_all:
1022         //FIXME not freeing allocated queues of map_nodes
1023         free(map_nodes);
1024 out_err_q:
1025         xq_free(&clonemap->pending);
1026 out_err_objhash:
1027         xhash_free(clonemap->objects);
1028 out_err_clonemap:
1029         free(clonemap);
1030 out_err:
1031         target = xseg_get_target(peer->xseg, pr->req);
1032         strncpy(buf, target, req->targetlen);
1033         buf[req->targetlen] = 0;
1034         XSEGLOG2(&lc, E, "Clone map for %s failed", buf);
1035         fail(peer, pr);
1036         return -1;
1037 }
1038
1039 static int req2objs(struct peerd *peer, struct peer_req *pr, 
1040                                         struct map *map, int write)
1041 {
1042         char *target = xseg_get_target(peer->xseg, pr->req);
1043         uint32_t nr_objs = calc_nr_obj(pr->req);
1044         uint64_t size = sizeof(struct xseg_reply_map) + 
1045                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1046
1047         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1048         /* resize request to fit reply */
1049         char buf[XSEG_MAX_TARGETLEN];
1050         strncpy(buf, target, pr->req->targetlen);
1051         int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1052         if (r < 0) {
1053                 XSEGLOG2(&lc, E, "Cannot resize request");
1054                 return -1;
1055         }
1056         target = xseg_get_target(peer->xseg, pr->req);
1057         strncpy(target, buf, pr->req->targetlen);
1058
1059         /* structure reply */
1060         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1061         reply->cnt = nr_objs;
1062
1063         uint32_t idx = 0;
1064         uint64_t rem_size = pr->req->size;
1065         uint64_t obj_index = pr->req->offset / block_size;
1066         uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo
1067         uint64_t obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1068         struct map_node * mn = find_object(map, obj_index);
1069         if (!mn) {
1070                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1071                 goto out_err;
1072         }
1073         if (write && (mn->flags & MF_OBJECT_NOT_READY)) 
1074                 goto out_object_copying;
1075         if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1076                 //calc new_target, copy up object
1077                 r = copyup_object(peer, mn, pr);
1078                 if (r < 0) {
1079                         XSEGLOG2(&lc, E, "Error in copy up object");
1080                         goto out_err_copy;
1081                 }
1082                 goto out_object_copying;
1083         }
1084
1085 //      XSEGLOG2(&lc, D, "pr->req->offset: %llu, pr->req->size %llu, block_size %u\n", 
1086 //                              (unsigned long long) pr->req->offset, 
1087 //                              (unsigned long long) pr->req->size, 
1088 //                              block_size);
1089         strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1090         reply->segs[idx].targetlen = mn->objectlen;
1091         reply->segs[idx].offset = obj_offset;
1092         reply->segs[idx].size = obj_size;
1093 //      XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
1094 //                                      (unsigned long long) reply->segs[idx].size,
1095 //                                      (unsigned long long) reply->segs[idx].offset);
1096         rem_size -= obj_size;
1097         while (rem_size > 0) {
1098                 idx++;
1099                 obj_index++;
1100                 obj_offset = 0;
1101                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
1102                 rem_size -= obj_size;
1103                 mn = find_object(map, obj_index);
1104                 if (!mn) {
1105                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1106                         goto out_err;
1107                 }
1108                 if (write && (mn->flags & MF_OBJECT_NOT_READY)) 
1109                         goto out_object_copying;
1110                 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
1111                         //calc new_target, copy up object
1112                         r = copyup_object(peer, mn, pr);
1113                         if (r < 0) {
1114                                 XSEGLOG2(&lc, E, "Error in copy up object");
1115                                 goto out_err_copy;
1116                         }
1117                         goto out_object_copying;
1118                 }
1119                 strncpy(reply->segs[idx].target, mn->object, mn->objectlen);
1120                 reply->segs[idx].targetlen = mn->objectlen;
1121                 reply->segs[idx].offset = obj_offset;
1122                 reply->segs[idx].size = obj_size;
1123 //              XSEGLOG2(&lc, D, "Added object: %s, size: %llu, offset: %llu", mn->object,
1124 //                              (unsigned long long) reply->segs[idx].size,
1125 //                              (unsigned long long) reply->segs[idx].offset);
1126         }
1127         if (reply->cnt != (idx + 1)){
1128                 XSEGLOG2(&lc, E, "reply->cnt %u, idx+1: %u", reply->cnt, idx+1);
1129                 goto out_err;
1130         }
1131
1132         return 0;
1133
1134 out_object_copying:
1135         //printf("r2o mn: %lx\n", mn);
1136         //printf("volume %s pending on %s\n", map->volume, mn->object);
1137         //assert write
1138         if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx)
1139                 XSEGLOG2(&lc, E, "Cannot append pr to tail");
1140         XSEGLOG2(&lc, I, "object %s is pending \n\t idx:%llu of map %s",
1141                         mn->object, (unsigned long long) mn->objectidx, map->volume);
1142         return MF_PENDING;
1143
1144 out_err_copy:
1145 out_err:
1146         return -1;
1147 }
1148
1149 static int handle_mapr(struct peerd *peer, struct peer_req *pr, 
1150                                 struct xseg_request *req)
1151 {
1152         struct mapperd *mapper = __get_mapperd(peer);
1153         struct mapper_io *mio = __get_mapper_io(pr);
1154         (void)mapper;
1155         (void)mio;
1156         //get_map
1157         char *target = xseg_get_target(peer->xseg, pr->req);
1158         struct map *map;
1159         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1160         if (r < 0) {
1161                 fail(peer, pr);
1162                 return -1;
1163         }
1164         else if (r == MF_PENDING)
1165                 return 0;
1166         
1167         if (map->flags & MF_MAP_DESTROYED) {
1168                 fail(peer, pr);
1169                 return 0;
1170         }
1171         
1172         //get_object
1173         r = req2objs(peer, pr, map, 0);
1174         if  (r < 0){
1175                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1176                                 map->volume, 
1177                                 (unsigned long long) pr->req->offset, 
1178                                 (unsigned long long) (pr->req->offset + pr->req->size));
1179                 fail(peer, pr);
1180         }
1181         else if (r == 0)
1182                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1183                                 map->volume, 
1184                                 (unsigned long long) pr->req->offset, 
1185                                 (unsigned long long) (pr->req->offset + pr->req->size));
1186                 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1187                                 (unsigned long long) req->offset,
1188                                 (unsigned long long) req->size);
1189                 char buf[XSEG_MAX_TARGETLEN+1];
1190                 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1191                 int i;
1192                 for (i = 0; i < reply->cnt; i++) {
1193                         XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1194                         strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1195                         buf[reply->segs[i].targetlen] = 0;
1196                         XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1197                                         (unsigned long long) reply->segs[i].offset,
1198                                         (unsigned long long) reply->segs[i].size);
1199                 }
1200                 complete(peer, pr);
1201
1202         return 0;
1203
1204
1205 }
1206
1207 static int handle_copyup(struct peerd *peer, struct peer_req *pr,
1208                                 struct xseg_request *req)
1209 {
1210         struct mapperd *mapper = __get_mapperd(peer);
1211         (void) mapper;
1212         struct mapper_io *mio = __get_mapper_io(pr);
1213         int r = 0;
1214         xqindex idx;
1215         struct map_node *mn = __get_copyup_node(mio, req);
1216         if (!mn)
1217                 goto out_err;
1218
1219         mn->flags &= ~MF_OBJECT_COPYING;
1220         if (req->state & XS_FAILED && !(req->state & XS_SERVED)){
1221                 XSEGLOG2(&lc, E, "Copy up of object %s failed", mn->object);
1222                 goto out_fail;
1223         }
1224         struct map *map = mn->map;
1225         if (!map){
1226                 XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1227                 goto out_fail;
1228         }
1229         
1230         /* construct a tmp map_node for writing purposes */
1231         char *target = xseg_get_target(peer->xseg, req);
1232         struct map_node newmn = *mn;
1233         newmn.flags = MF_OBJECT_EXIST;
1234         strncpy(newmn.object, target, req->targetlen);
1235         newmn.object[req->targetlen] = 0;
1236         newmn.objectlen = req->targetlen;
1237         newmn.objectidx = mn->objectidx; 
1238         r = object_write(peer, pr, map, &newmn);
1239         if (r != MF_PENDING){
1240                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1241                                 "\n\t of map %s [%llu]",
1242                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
1243                 goto out_fail;
1244         }
1245         mn->flags |= MF_OBJECT_WRITING;
1246         xseg_put_request(peer->xseg, req, peer->portno);
1247         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1248         return 0;
1249
1250 out_fail:
1251         xseg_put_request(peer->xseg, req, peer->portno);
1252         __set_copyup_node(mio, req, NULL);
1253         while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1254                 struct peer_req * preq = (struct peer_req *) idx;
1255                 fail(peer, preq);
1256         }
1257         return 0;
1258
1259 out_err:
1260         XSEGLOG2(&lc, E, "Cannot get map node");
1261         return -1;
1262 }
1263
1264 static int handle_objectwrite(struct peerd *peer, struct peer_req *pr,
1265                                 struct xseg_request *req)
1266 {
1267         xqindex idx;
1268         struct mapperd *mapper = __get_mapperd(peer);
1269         struct mapper_io *mio = __get_mapper_io(pr);
1270         //assert req->op = X_WRITE;
1271         char *target = xseg_get_target(peer->xseg, req);
1272         (void)target;
1273         (void)mapper;
1274         //printf("handle object write replyi\n");
1275         struct map_node *mn = __get_copyup_node(mio, req);
1276         if (!mn)
1277                 goto out_err;
1278         
1279         __set_copyup_node(mio, req, NULL);
1280         
1281         //assert mn->flags & MF_OBJECT_WRITING
1282         mn->flags &= ~MF_OBJECT_WRITING;
1283         if (req->state & XS_FAILED)
1284                 goto out_fail;
1285
1286         struct map_node tmp;
1287         char *data = xseg_get_data(peer->xseg, req);
1288         map_to_object(&tmp, data);
1289         mn->flags |= MF_OBJECT_EXIST;
1290         if (mn->flags != MF_OBJECT_EXIST){
1291                 XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1292                 return *(int *) 0;
1293         }
1294         //assert mn->flags & MF_OBJECT_EXIST
1295         strncpy(mn->object, tmp.object, tmp.objectlen);
1296         mn->object[tmp.objectlen] = 0;
1297         mn->objectlen = tmp.objectlen;
1298         xseg_put_request(peer->xseg, req, peer->portno);
1299
1300         XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1301         uint64_t qsize = xq_count(&mn->pending);
1302         while(qsize > 0 && (idx = __xq_pop_head(&mn->pending)) != Noneidx){
1303                 qsize--;
1304                 struct peer_req * preq = (struct peer_req *) idx;
1305                 my_dispatch(peer, preq, preq->req);
1306         }
1307         return 0;
1308
1309 out_fail:
1310         XSEGLOG2(&lc, E, "Write of object %s failed", mn->object);
1311         xseg_put_request(peer->xseg, req, peer->portno);
1312         while((idx = __xq_pop_head(&mn->pending)) != Noneidx){
1313                 struct peer_req *preq = (struct peer_req *) idx;
1314                 fail(peer, preq);
1315         }
1316         return 0;
1317
1318 out_err:
1319         XSEGLOG2(&lc, E, "Cannot find map node. Failure!");
1320         xseg_put_request(peer->xseg, req, peer->portno);
1321         return -1;
1322 }
1323
1324 static int handle_mapw(struct peerd *peer, struct peer_req *pr, 
1325                                 struct xseg_request *req)
1326 {
1327         struct mapperd *mapper = __get_mapperd(peer);
1328         struct mapper_io *mio = __get_mapper_io(pr);
1329         (void) mapper;
1330         (void) mio;
1331         /* handle copy up replies separately */
1332         if (req->op == X_COPY){
1333                 if (handle_copyup(peer, pr, req) < 0){
1334                         XSEGLOG2(&lc, E, "Handle copy up returned error");
1335                         fail(peer, pr);
1336                         return -1;
1337                 } else {
1338                         return 0;
1339                 }
1340         }
1341         else if(req->op == X_WRITE){
1342                 /* handle replies of object write operations */
1343                 if (handle_objectwrite(peer, pr, req) < 0) {
1344                         XSEGLOG2(&lc, E, "Handle object write returned error");
1345                         fail(peer, pr);
1346                         return -1;
1347                 } else {
1348                         return 0;
1349                 }
1350         }
1351
1352         char *target = xseg_get_target(peer->xseg, pr->req);
1353         struct map *map;
1354         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1355         if (r < 0) {
1356                 fail(peer, pr);
1357                 return -1;
1358         }
1359         else if (r == MF_PENDING)
1360                 return 0;
1361         
1362         if (map->flags & MF_MAP_DESTROYED) {
1363                 fail(peer, pr);
1364                 return 0;
1365         }
1366
1367         r = req2objs(peer, pr, map, 1);
1368         if (r < 0){
1369                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1370                                 map->volume, 
1371                                 (unsigned long long) pr->req->offset, 
1372                                 (unsigned long long) (pr->req->offset + pr->req->size));
1373                 fail(peer, pr);
1374         }
1375         if (r == 0){
1376                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1377                                 map->volume, 
1378                                 (unsigned long long) pr->req->offset, 
1379                                 (unsigned long long) (pr->req->offset + pr->req->size));
1380                 XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1381                                 (unsigned long long) req->offset,
1382                                 (unsigned long long) req->size);
1383                 char buf[XSEG_MAX_TARGETLEN+1];
1384                 struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1385                 int i;
1386                 for (i = 0; i < reply->cnt; i++) {
1387                         XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1388                         strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1389                         buf[reply->segs[i].targetlen] = 0;
1390                         XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1391                                         (unsigned long long) reply->segs[i].offset,
1392                                         (unsigned long long) reply->segs[i].size);
1393                 }
1394                 complete(peer, pr);
1395         }
1396         //else copyup pending, wait for pr restart
1397
1398         return 0;
1399 }
1400
1401 static int handle_snap(struct peerd *peer, struct peer_req *pr, 
1402                                 struct xseg_request *req)
1403 {
1404         fail(peer, pr);
1405         return 0;
1406 }
1407
1408 static int handle_info(struct peerd *peer, struct peer_req *pr, 
1409                                 struct xseg_request *req)
1410 {
1411         struct mapperd *mapper = __get_mapperd(peer);
1412         struct mapper_io *mio = __get_mapper_io(pr);
1413         (void) mapper;
1414         (void) mio;
1415         char *target = xseg_get_target(peer->xseg, pr->req);
1416         if (!target) {
1417                 fail(peer, pr);
1418                 return 0;
1419         }
1420         //printf("Handle info\n");
1421         struct map *map;
1422         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1423         if (r < 0) {
1424                 fail(peer, pr);
1425                 return -1;
1426         }
1427         else if (r == MF_PENDING)
1428                 return 0;
1429         if (map->flags & MF_MAP_DESTROYED) {
1430                 fail(peer, pr);
1431                 return 0;
1432         }
1433         
1434         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1435         xinfo->size = map->size;
1436         complete(peer, pr);
1437
1438         return 0;
1439 }
1440
1441 static int delete_object(struct peerd *peer, struct peer_req *pr,
1442                                 struct map_node *mn)
1443 {
1444         void *dummy;
1445         struct mapperd *mapper = __get_mapperd(peer);
1446         struct mapper_io *mio = __get_mapper_io(pr);
1447
1448         mio->delobj = mn->objectidx;
1449         if (xq_count(&mn->pending) != 0) {
1450                 __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1451                 XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1452                                 mn->object);
1453                 return MF_PENDING;
1454         }
1455
1456         struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
1457                                                         mapper->bportno, X_ALLOC);
1458         if (!req)
1459                 goto out_err;
1460         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1461         if (r < 0)
1462                 goto out_put;
1463         char *target = xseg_get_target(peer->xseg, req);
1464         strncpy(target, mn->object, req->targetlen);
1465         req->op = X_DELETE;
1466         req->size = req->datalen;
1467         req->offset = 0;
1468
1469         r = xseg_set_req_data(peer->xseg, req, pr);
1470         if (r < 0)
1471                 goto out_put;
1472         __set_copyup_node(mio, req, mn);
1473         xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
1474         if (p == NoPort)
1475                 goto out_unset;
1476         r = xseg_signal(peer->xseg, p);
1477         mn->flags |= MF_OBJECT_DELETING;
1478         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1479         return MF_PENDING;
1480
1481 out_unset:
1482         xseg_get_req_data(peer->xseg, req, &dummy);
1483 out_put:
1484         xseg_put_request(peer->xseg, req, peer->portno);
1485 out_err:
1486         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1487         return -1;
1488 }
1489
1490 /*
1491  * Find next object for deletion. Start searching on idx mio->delobj.
1492  * Skip non existing map_nodes, free_resources and skip non-existing objects
1493  * Wait for all pending operations on the object, before moving forward to the 
1494  * next object.
1495  *
1496  * Return MF_PENDING if theres is a pending operation on the next object
1497  * or zero if there is no next object
1498  */
1499 static int delete_next_object(struct peerd *peer, struct peer_req *pr,
1500                                 struct map *map)
1501 {
1502         struct mapperd *mapper = __get_mapperd(peer);
1503         struct mapper_io *mio = __get_mapper_io(pr);
1504         uint64_t idx = mio->delobj;
1505         struct map_node *mn;
1506         int r;
1507 retry:
1508         while (idx < calc_map_obj(map)) {
1509                 mn = find_object(map, idx);
1510                 if (!mn) {
1511                         idx++;
1512                         goto retry;
1513                 }
1514                 mio->delobj = idx;
1515                 if (xq_count(&mn->pending) != 0) {
1516                         __xq_append_tail(&mn->pending, (xqindex) pr); //FIXME err check
1517                         XSEGLOG2(&lc, I, "Object %s has pending requests. Adding to pending",
1518                                         mn->object);
1519                         return MF_PENDING;
1520                 }
1521                 if (mn->flags & MF_OBJECT_EXIST){
1522                         r = delete_object(peer, pr, mn);
1523                         if (r < 0) {
1524                                 /* on error, just log it, release resources and
1525                                  * proceed to the next object
1526                                  */
1527                                 XSEGLOG2(&lc, E, "Object %s delete object return error"
1528                                                 "\n\t Map: %s [%llu]", 
1529                                                 mn->object, mn->map->volume, 
1530                                                 (unsigned long long) mn->objectidx);
1531                                 xq_free(&mn->pending);
1532                         }
1533                         else if (r == MF_PENDING){
1534                                 return r;
1535                         }
1536                 } else {
1537                         xq_free(&mn->pending);
1538                 }
1539                 idx++;
1540         }
1541         return 0;
1542 }
1543
1544 static int handle_object_delete(struct peerd *peer, struct peer_req *pr, 
1545                                  struct map_node *mn, int err)
1546 {
1547         struct mapperd *mapper = __get_mapperd(peer);
1548         struct mapper_io *mio = __get_mapper_io(pr);
1549         uint64_t idx;
1550         struct map *map = mn->map;
1551         int r;
1552         (void) mio;
1553         //if object deletion failed, map deletion must continue
1554         //and report OK, since map block has been deleted succesfully
1555         //so, no check for err
1556
1557         //assert object flags OK
1558         //free map_node_resources
1559         mn->flags &= ~MF_OBJECT_DELETING;
1560         xq_free(&mn->pending);
1561
1562         mio->delobj++;
1563         r = delete_next_object(peer, pr, map);
1564         if (r != MF_PENDING){
1565                 /* if there is no next object to delete, remove the map block
1566                  * from memory
1567                  */
1568
1569                 //assert map flags OK
1570                 map->flags |= MF_MAP_DESTROYED;
1571                 XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1572                 //make all pending requests on map to fail
1573                 uint64_t qsize = xq_count(&map->pending);
1574                 while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
1575                         qsize--;
1576                         struct peer_req * preq = (struct peer_req *) idx;
1577                         my_dispatch(peer, preq, preq->req);
1578                 }
1579                 //free map resources;
1580                 remove_map(mapper, map);
1581                 mn = find_object(map, 0);
1582                 free(mn);
1583                 xq_free(&map->pending);
1584                 free(map);
1585         }
1586         XSEGLOG2(&lc, I, "Handle object delete OK");
1587         return 0;
1588 }
1589
1590 static int delete_map(struct peerd *peer, struct peer_req *pr,
1591                         struct map *map)
1592 {
1593         void *dummy;
1594         struct mapperd *mapper = __get_mapperd(peer);
1595         struct mapper_io *mio = __get_mapper_io(pr);
1596         struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
1597                                                         mapper->mbportno, X_ALLOC);
1598         if (!req)
1599                 goto out_err;
1600         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1601         if (r < 0)
1602                 goto out_put;
1603         char *target = xseg_get_target(peer->xseg, req);
1604         strncpy(target, map->volume, req->targetlen);
1605         req->op = X_DELETE;
1606         req->size = req->datalen;
1607         req->offset = 0;
1608
1609         r = xseg_set_req_data(peer->xseg, req, pr);
1610         if (r < 0)
1611                 goto out_put;
1612         __set_copyup_node(mio, req, NULL);
1613         xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
1614         if (p == NoPort)
1615                 goto out_unset;
1616         r = xseg_signal(peer->xseg, p);
1617         map->flags |= MF_MAP_DELETING;
1618         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1619         return MF_PENDING;
1620
1621 out_unset:
1622         xseg_get_req_data(peer->xseg, req, &dummy);
1623 out_put:
1624         xseg_put_request(peer->xseg, req, peer->portno);
1625 out_err:
1626         XSEGLOG2(&lc, I, "Map %s deletion failed", map->volume);
1627         return -1;
1628 }
1629
1630 static int handle_map_delete(struct peerd *peer, struct peer_req *pr, 
1631                                 struct map *map, int err)
1632 {
1633         struct mapperd *mapper = __get_mapperd(peer);
1634         struct mapper_io *mio = __get_mapper_io(pr);
1635         xqindex idx;
1636         int r;
1637         (void) mio;
1638         map->flags &= ~MF_MAP_DELETING;
1639         if (err) {
1640                 XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1641                 //dispatch all pending
1642                 while ((idx = __xq_pop_head(&map->pending)) != Noneidx){
1643                         struct peer_req * preq = (struct peer_req *) idx;
1644                         my_dispatch(peer, preq, preq->req);
1645                 }
1646         } else {
1647                 map->flags |= MF_MAP_DESTROYED;
1648                 //delete all objects
1649                 XSEGLOG2(&lc, I, "Map %s map block deleted. Deleting objects", map->volume);
1650                 mio->delobj = 0;
1651                 r = delete_next_object(peer, pr, map);
1652                 if (r != MF_PENDING){
1653                         /* if there is no next object to delete, remove the map block
1654                          * from memory
1655                          */
1656                         //assert map flags OK
1657                         map->flags |= MF_MAP_DESTROYED;
1658                         XSEGLOG2(&lc, I, "Map %s deleted", map->volume);
1659                         //make all pending requests on map to fail
1660                         uint64_t qsize = xq_count(&map->pending);
1661                         while(qsize > 0 && (idx = __xq_pop_head(&map->pending)) != Noneidx){
1662                                 qsize--;
1663                                 struct peer_req * preq = (struct peer_req *) idx;
1664                                 my_dispatch(peer, preq, preq->req);
1665                         }
1666                         //free map resources;
1667                         remove_map(mapper, map);
1668                         struct map_node *mn = find_object(map, 0);
1669                         if (mn)
1670                                 free(mn);
1671                         xq_free(&map->pending);
1672                         free(map);
1673                 }
1674         }
1675         return 0;
1676 }
1677
1678 static int handle_delete(struct peerd *peer, struct peer_req *pr, 
1679                                 struct xseg_request *req)
1680 {
1681         struct mapperd *mapper = __get_mapperd(peer);
1682         struct mapper_io *mio = __get_mapper_io(pr);
1683         struct map_node *mn;
1684         struct map *map;
1685         int err = 0;
1686         if (req->state & XS_FAILED && !(req->state &XS_SERVED)) 
1687                 err = 1;
1688         
1689         mn = __get_copyup_node(mio, req);
1690         __set_copyup_node(mio, req, NULL);
1691         char *target = xseg_get_target(peer->xseg, req);
1692         if (!mn) {
1693                 //map block delete
1694                 map = find_map(mapper, target, req->targetlen);
1695                 if (!map) {
1696                         xseg_put_request(peer->xseg, req, peer->portno);
1697                         return -1;
1698                 }
1699                 handle_map_delete(peer, pr, map, err);
1700         } else {
1701                 //object delete
1702                 map = mn->map;
1703                 if (!map) {
1704                         xseg_put_request(peer->xseg, req, peer->portno);
1705                         return -1;
1706                 }
1707                 handle_object_delete(peer, pr, mn, err);
1708         }
1709         xseg_put_request(peer->xseg, req, peer->portno);
1710         return 0;
1711 }
1712
1713 static int handle_destroy(struct peerd *peer, struct peer_req *pr, 
1714                                 struct xseg_request *req)
1715 {
1716         struct mapperd *mapper = __get_mapperd(peer);
1717         struct mapper_io *mio = __get_mapper_io(pr);
1718         (void) mapper;
1719         int r;
1720         char buf[XSEG_MAX_TARGETLEN+1];
1721         char *target = xseg_get_target(peer->xseg, pr->req);
1722
1723         strncpy(buf, target, pr->req->targetlen);
1724         buf[req->targetlen] = 0;
1725
1726         XSEGLOG2(&lc, D, "Handle destroy pr: %lx, pr->req: %lx, req: %lx",
1727                         (unsigned long) pr, (unsigned long) pr->req,
1728                         (unsigned long) req);
1729         XSEGLOG2(&lc, D, "target: %s (%u)", buf, strlen(buf));
1730         if (pr->req != req && req->op == X_DELETE) {
1731                 //assert mio->state == DELETING
1732                 r = handle_delete(peer, pr, req);
1733                 if (r < 0) {
1734                         XSEGLOG2(&lc, E, "Handle delete returned error");
1735                         fail(peer, pr);
1736                         return -1;
1737                 } else {
1738                         return 0;
1739                 }
1740         }
1741
1742         struct map *map;
1743         r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
1744         if (r < 0) {
1745                 fail(peer, pr);
1746                 return -1;
1747         }
1748         else if (r == MF_PENDING)
1749                 return 0;
1750         if (map->flags & MF_MAP_DESTROYED) {
1751                 if (mio->state == DELETING){
1752                         XSEGLOG2(&lc, I, "Map %s destroyed", map->volume);
1753                         complete(peer, pr);
1754                 }
1755                 else{
1756                         XSEGLOG2(&lc, I, "Map %s already destroyed", map->volume);
1757                         fail(peer, pr);
1758                 }
1759                 return 0;
1760         }
1761         if (mio->state == DELETING) {
1762                 //continue deleting map objects;
1763                 r = delete_next_object(peer ,pr, map);
1764                 if (r != MF_PENDING){
1765                         complete(peer, pr);
1766                 }
1767                 return 0;
1768         }
1769         //delete map block
1770         r = delete_map(peer, pr, map);
1771         if (r < 0) {
1772                 XSEGLOG2(&lc, E, "Map delete for map %s returned error", map->volume);
1773                 fail(peer, pr);
1774                 return -1;
1775         } else if (r == MF_PENDING) {
1776                 XSEGLOG2(&lc, I, "Map %s delete pending", map->volume);
1777                 __xq_append_tail(&map->pending, (xqindex) pr);
1778                 mio->state = DELETING;
1779                 return 0;
1780         }
1781         //unreachable
1782         XSEGLOG2(&lc, E, "Destroy unreachable");
1783         fail(peer, pr);
1784         return 0;
1785 }
1786
1787 static int handle_dropcache(struct peerd *peer, struct peer_req *pr, 
1788                                 struct xseg_request *req)
1789 {
1790         struct mapperd *mapper = __get_mapperd(peer);
1791         struct mapper_io *mio = __get_mapper_io(pr);
1792         (void) mapper;
1793         (void) mio;
1794         char *target = xseg_get_target(peer->xseg, pr->req);
1795         if (!target) {
1796                 fail(peer, pr);
1797                 return 0;
1798         }
1799
1800         struct map *map = find_map(mapper, target, pr->req->targetlen);
1801         if (!map){
1802                 complete(peer, pr);
1803                 return 0;
1804         } else if (map->flags & MF_MAP_DESTROYED) {
1805                 complete(peer, pr);
1806                 return 0;
1807         } else if (map->flags & MF_MAP_NOT_READY && mio->state != DROPPING_CACHE) {
1808                 __xq_append_tail(&map->pending, (xqindex) pr);
1809                 return 0;
1810         }
1811
1812         if (mio->state != DROPPING_CACHE) {
1813                 /* block all future operations on the map */
1814                 map->flags |= MF_MAP_DROPPING_CACHE;
1815                 mio->dcobj = 0;
1816                 mio->state = DROPPING_CACHE;
1817                 XSEGLOG2(&lc, I, "Map %s start dropping cache", map->volume);
1818         } else {
1819                 XSEGLOG2(&lc, I, "Map %s continue dropping cache", map->volume);
1820         }
1821
1822         struct map_node *mn; 
1823         uint64_t i;
1824         for (i = mio->dcobj; i < calc_map_obj(map); i++) {
1825                 mn = find_object(map, i);
1826                 if (!mn)
1827                         continue;
1828                 mio->dcobj = i;
1829                 if (xq_count(&mn->pending) != 0){
1830                         XSEGLOG2(&lc, D, "Map %s pending dropping cache for obj idx: %llu", 
1831                                 map->volume, (unsigned long long) mn->objectidx);
1832                         __xq_append_tail(&mn->pending, (xqindex) pr);
1833                         return 0;
1834                 }
1835                 xq_free(&mn->pending);
1836                 XSEGLOG2(&lc, D, "Map %s dropped cache for obj idx: %llu", 
1837                                 map->volume, (unsigned long long) mn->objectidx);
1838         }
1839         remove_map(mapper, map);
1840         //dispatch pending
1841         uint64_t qsize = xq_count(&map->pending);
1842         while(qsize > 0 && (i = __xq_pop_head(&map->pending)) != Noneidx){
1843                 qsize--;
1844                 struct peer_req * preq = (struct peer_req *) i;
1845                 my_dispatch(peer, preq, preq->req);
1846         }
1847         XSEGLOG2(&lc, I, "Map %s droped cache", map->volume);
1848         
1849         //free map resources;
1850         mn = find_object(map, 0);
1851         if (mn)
1852                 free(mn);
1853         xq_free(&map->pending);
1854         free(map);
1855
1856         complete(peer, pr);
1857
1858         return 0;
1859 }
1860
1861 static int my_dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1862 {
1863         struct mapperd *mapper = __get_mapperd(peer);
1864         (void) mapper;
1865         struct mapper_io *mio = __get_mapper_io(pr);
1866         (void) mio;
1867
1868         if (req->op == X_READ) {
1869                 /* catch map reads requests here */
1870                 handle_mapread(peer, pr, req);
1871                 return 0;
1872         }
1873
1874         switch (pr->req->op) {
1875                 /* primary xseg operations of mapper */
1876                 case X_CLONE: handle_clone(peer, pr, req); break;
1877                 case X_MAPR: handle_mapr(peer, pr, req); break;
1878                 case X_MAPW: handle_mapw(peer, pr, req); break;
1879 //              case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1880                 case X_INFO: handle_info(peer, pr, req); break;
1881                 case X_DELETE: handle_destroy(peer, pr, req); break;
1882                 case X_CLOSE: handle_dropcache(peer, pr, req); break;
1883                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1884         }
1885         return 0;
1886 }
1887
1888 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1889 {
1890         struct mapperd *mapper = __get_mapperd(peer);
1891         (void) mapper;
1892         struct mapper_io *mio = __get_mapper_io(pr);
1893         (void) mio;
1894
1895         if (pr->req == req)
1896                 mio->state = ACCEPTED;
1897         my_dispatch(peer, pr ,req);
1898         return 0;
1899 }
1900
1901 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1902 {
1903         int i;
1904         unsigned char buf[SHA256_DIGEST_SIZE];
1905         char *zero;
1906
1907         gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1908
1909         /* Version check should be the very first call because it
1910           makes sure that important subsystems are intialized. */
1911         gcry_check_version (NULL);
1912      
1913         /* Disable secure memory.  */
1914         gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1915      
1916         /* Tell Libgcrypt that initialization has completed. */
1917         gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1918
1919         /* calculate out magic sha hash value */
1920         gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1921
1922         /* calculate zero block */
1923         //FIXME check hash value
1924         zero = malloc(block_size);
1925         memset(zero, 0, block_size);
1926         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1927         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1928                 sprintf(zero_block + 2*i, "%02x", buf[i]);
1929         printf("%s \n", zero_block);
1930         free(zero);
1931
1932         //FIXME error checks
1933         struct mapperd *mapper = malloc(sizeof(struct mapperd));
1934         mapper->hashmaps = xhash_new(3, STRING);
1935         peer->priv = mapper;
1936         
1937         for (i = 0; i < peer->nr_ops; i++) {
1938                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1939                 mio->copyups_nodes = xhash_new(3, INTEGER);
1940                 mio->copyups = 0;
1941                 mio->err = 0;
1942                 peer->peer_reqs[i].priv = mio;
1943         }
1944
1945         for (i = 0; i < argc; i++) {
1946                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1947                         mapper->bportno = atoi(argv[i+1]);
1948                         i += 1;
1949                         continue;
1950                 }
1951                 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1952                         mapper->mbportno = atoi(argv[i+1]);
1953                         i += 1;
1954                         continue;
1955                 }
1956                 /* enforce only one thread */
1957                 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1958                         int t = atoi(argv[i+1]);
1959                         if (t != 1) {
1960                                 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1961                                 return -1;
1962                         }
1963                         i += 1;
1964                         continue;
1965                 }
1966         }
1967
1968 //      test_map(peer);
1969
1970         return 0;
1971 }
1972
1973 void print_obj(struct map_node *mn)
1974 {
1975         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
1976                         (unsigned long long) mn->objectidx, mn->object, 
1977                         (unsigned int) mn->objectlen, 
1978                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
1979 }
1980
1981 void print_map(struct map *m)
1982 {
1983         uint64_t nr_objs = m->size/block_size;
1984         if (m->size % block_size)
1985                 nr_objs++;
1986         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n", 
1987                         m->volume, m->volumelen, 
1988                         (unsigned long long) m->size, 
1989                         (unsigned long long) nr_objs);
1990         uint64_t i;
1991         struct map_node *mn;
1992         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
1993                 return;
1994         for (i = 0; i < nr_objs; i++) {
1995                 mn = find_object(m, i);
1996                 if (!mn){
1997                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
1998                         continue;
1999                 }
2000                 print_obj(mn);
2001         }
2002 }
2003
2004 /*
2005 void test_map(struct peerd *peer)
2006 {
2007         int i,j, ret;
2008         //struct sha256_ctx sha256ctx;
2009         unsigned char buf[SHA256_DIGEST_SIZE];
2010         char buf_new[XSEG_MAX_TARGETLEN + 20];
2011         struct map *m = malloc(sizeof(struct map));
2012         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2013         m->volume[XSEG_MAX_TARGETLEN] = 0;
2014         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2015         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2016         m->volumelen = XSEG_MAX_TARGETLEN;
2017         m->size = 100*block_size;
2018         m->objects = xhash_new(3, INTEGER);
2019         struct map_node *map_node = calloc(100, sizeof(struct map_node));
2020         for (i = 0; i < 100; i++) {
2021                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2022                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2023                 
2024                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2025                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2026                 }
2027                 map_node[i].objectidx = i;
2028                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2029                 map_node[i].flags = MF_OBJECT_EXIST;
2030                 ret = insert_object(m, &map_node[i]);
2031         }
2032
2033         char *data = malloc(block_size);
2034         mapheader_to_map(m, data);
2035         uint64_t pos = mapheader_size;
2036
2037         for (i = 0; i < 100; i++) {
2038                 map_node = find_object(m, i);
2039                 if (!map_node){
2040                         printf("no object node %d \n", i);
2041                         exit(1);
2042                 }
2043                 object_to_map(data+pos, map_node);
2044                 pos += objectsize_in_map;
2045         }
2046 //      print_map(m);
2047
2048         struct map *m2 = malloc(sizeof(struct map));
2049         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2050         m->volume[XSEG_MAX_TARGETLEN] = 0;
2051         m->volumelen = XSEG_MAX_TARGETLEN;
2052
2053         m2->objects = xhash_new(3, INTEGER);
2054         ret = read_map(peer, m2, data);
2055 //      print_map(m2);
2056
2057         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2058         ssize_t r, sum = 0;
2059         while (sum < block_size) {
2060                 r = write(fd, data + sum, block_size -sum);
2061                 if (r < 0){
2062                         perror("write");
2063                         printf("write error\n");
2064                         exit(1);
2065                 } 
2066                 sum += r;
2067         }
2068         close(fd);
2069         map_node = find_object(m, 0);
2070         free(map_node);
2071         free(m);
2072 }
2073 */