e5021a30d195109df4aab6fa17d977ef5264b4ae
[archipelago] / xseg / peers / user / mt-mapperd.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <pthread.h>
5 #include <xseg/xseg.h>
6 #include <peer.h>
7 #include <time.h>
8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <gcrypt.h>
14 #include <errno.h>
15 #include <sched.h>
16 #include <sys/syscall.h>
17
18 GCRY_THREAD_OPTION_PTHREAD_IMPL;
19 #define MF_LOAD         (1 << 0)
20 #define MF_EXCLUSIVE    (1 << 1)
21 #define MF_FORCE        (1 << 2)
22
23 #define SHA256_DIGEST_SIZE 32
24 /* hex representation of sha256 value takes up double the sha256 size */
25 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
26
27 #define block_size (1<<22) //FIXME this should be defined here?
28 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
29 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value  + volume size */
30
31 #define MF_OBJECT_EXIST         (1 << 0)
32 #define MF_OBJECT_COPYING       (1 << 1)
33 #define MF_OBJECT_WRITING       (1 << 2)
34 #define MF_OBJECT_DELETING      (1 << 3)
35 #define MF_OBJECT_DELETED       (1 << 4)
36 #define MF_OBJECT_DESTROYED     (1 << 5)
37
38 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
39
40 char *magic_string = "This a magic string. Please hash me";
41 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
42 char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
43
44 //dispatch_internal mapper states
45 enum mapper_state {
46         ACCEPTED = 0,
47         WRITING = 1,
48         COPYING = 2,
49         DELETING = 3,
50         DROPPING_CACHE = 4
51 };
52
53 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
54
55 struct map_node {
56         uint32_t flags;
57         uint32_t objectidx;
58         uint32_t objectlen;
59         char object[XSEG_MAX_TARGETLEN + 1];    /* NULL terminated string */
60         struct map *map;
61         uint32_t ref;
62         uint32_t waiters;
63         st_cond_t cond;
64 };
65
66 #define MF_MAP_LOADING          (1 << 0)
67 #define MF_MAP_DESTROYED        (1 << 1)
68 #define MF_MAP_WRITING          (1 << 2)
69 #define MF_MAP_DELETING         (1 << 3)
70 #define MF_MAP_DROPPING_CACHE   (1 << 4)
71 #define MF_MAP_EXCLUSIVE        (1 << 5)
72 #define MF_MAP_OPENING          (1 << 6)
73 #define MF_MAP_CLOSING          (1 << 7)
74
75 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
76                                         MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
77
78 #define wait_on_pr(__pr, __condition__)         \
79         while (__condition__){                  \
80                 ta--;                           \
81                 __get_mapper_io(pr)->active = 0;\
82                 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
83                 st_cond_wait(__pr->cond);       \
84         }
85
86 #define wait_on_mapnode(__mn, __condition__)    \
87         while (__condition__){                  \
88                 ta--;                           \
89                 __mn->waiters++;                \
90                 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
91                 st_cond_wait(__mn->cond);       \
92         }
93
94 #define wait_on_map(__map, __condition__)       \
95         while (__condition__){                  \
96                 ta--;                           \
97                 __map->waiters++;               \
98                 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
99                 st_cond_wait(__map->cond);      \
100         }
101
102 #define signal_pr(__pr)                         \
103         do {                                    \
104                 if (!__get_mapper_io(pr)->active){\
105                         ta++;                   \
106                         XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta); \
107                         __get_mapper_io(pr)->active = 1;\
108                         st_cond_signal(__pr->cond);     \
109                 }                               \
110         }while(0)
111
112 #define signal_map(__map)                       \
113         do {                                    \
114                 if (__map->waiters) {           \
115                         ta += 1;                \
116                         XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
117                         __map->waiters--;       \
118                         st_cond_signal(__map->cond);    \
119                 }                               \
120         }while(0)
121
122 #define signal_mapnode(__mn)                    \
123         do {                                    \
124                 if (__mn->waiters) {            \
125                         ta += __mn->waiters;    \
126                         XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
127                         __mn->waiters = 0;      \
128                         st_cond_broadcast(__mn->cond);  \
129                 }                               \
130         }while(0)
131
132
133 struct map {
134         uint32_t flags;
135         uint64_t size;
136         uint32_t volumelen;
137         char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
138         xhash_t *objects;       /* obj_index --> map_node */
139         uint32_t ref;
140         uint32_t waiters;
141         st_cond_t cond;
142 };
143
144 struct mapperd {
145         xport bportno;          /* blocker that accesses data */
146         xport mbportno;         /* blocker that accesses maps */
147         xhash_t *hashmaps; // hash_function(target) --> struct map
148 };
149
150 struct mapper_io {
151         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
152         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
153         struct map_node *copyup_node;
154         volatile int err;                       /* error flag */
155         volatile uint64_t del_pending;
156         uint64_t delobj;
157         uint64_t dcobj;
158         cb_t cb;
159         enum mapper_state state;
160         volatile int active;
161 };
162
163 /* global vars */
164 struct mapperd *mapper;
165
166 void print_map(struct map *m);
167
168 /*
169  * Helper functions
170  */
171
172 static inline struct mapperd * __get_mapperd(struct peerd *peer)
173 {
174         return (struct mapperd *) peer->priv;
175 }
176
177 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
178 {
179         return (struct mapper_io *) pr->priv;
180 }
181
182 static inline uint64_t calc_map_obj(struct map *map)
183 {
184         if (map->size == -1)
185                 return 0;
186         uint64_t nr_objs = map->size / block_size;
187         if (map->size % block_size)
188                 nr_objs++;
189         return nr_objs;
190 }
191
192 static uint32_t calc_nr_obj(struct xseg_request *req)
193 {
194         unsigned int r = 1;
195         uint64_t rem_size = req->size;
196         uint64_t obj_offset = req->offset & (block_size -1); //modulo
197         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
198         rem_size -= obj_size;
199         while (rem_size > 0) {
200                 obj_size = (rem_size > block_size) ? block_size : rem_size;
201                 rem_size -= obj_size;
202                 r++;
203         }
204
205         return r;
206 }
207
208 /*
209  * Maps handling functions
210  */
211 //FIXME assert targetlen > 0
212
213
214 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
215 {
216         int r;
217         struct map *m = NULL;
218         char buf[XSEG_MAX_TARGETLEN+1];
219         //assert targetlen <= XSEG_MAX_TARGETLEN
220         strncpy(buf, target, targetlen);
221         buf[targetlen] = 0;
222         XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
223         r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
224         if (r < 0)
225                 return NULL;
226         return m;
227 }
228
229
230 static int insert_map(struct mapperd *mapper, struct map *map)
231 {
232         int r = -1;
233         
234         if (find_map(mapper, map->volume, map->volumelen)){
235                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
236                 goto out;
237         }
238
239         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
240                         map->volume, strlen(map->volume), (unsigned long) map);
241         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
242         while (r == -XHASH_ERESIZE) {
243                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
244                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
245                 if (!new_hashmap){
246                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
247                                         (unsigned long long) shift);
248                         goto out;
249                 }
250                 mapper->hashmaps = new_hashmap;
251                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
252         }
253 out:
254         return r;
255 }
256
257 static int remove_map(struct mapperd *mapper, struct map *map)
258 {
259         int r = -1;
260         
261         //assert no pending pr on map
262         
263         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
264         while (r == -XHASH_ERESIZE) {
265                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
266                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
267                 if (!new_hashmap){
268                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
269                                         (unsigned long long) shift);
270                         goto out;
271                 }
272                 mapper->hashmaps = new_hashmap;
273                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
274         }
275 out:
276         return r;
277 }
278
279 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
280 {
281         int r;
282         xport p;
283         struct peerd *peer = pr->peer;
284         struct xseg_request *req;
285         struct mapperd *mapper = __get_mapperd(peer);
286         void *dummy;
287
288         XSEGLOG2(&lc, I, "Closing map %s", map->volume);
289
290         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
291         if (!req){
292                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
293                                 map->volume);
294                 goto out_err;
295         }
296
297         r = xseg_prep_request(peer->xseg, req, map->volumelen, block_size);
298         if (r < 0){
299                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
300                                 map->volume);
301                 goto out_put;
302         }
303
304         char *reqtarget = xseg_get_target(peer->xseg, req);
305         if (!reqtarget)
306                 goto out_put;
307         strncpy(reqtarget, map->volume, req->targetlen);
308         req->op = X_CLOSE;
309         req->size = block_size;
310         req->offset = 0;
311         r = xseg_set_req_data(peer->xseg, req, pr);
312         if (r < 0){
313                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
314                                 map->volume);
315                 goto out_put;
316         }
317         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
318         if (p == NoPort){ 
319                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
320                                 map->volume);
321                 goto out_unset;
322         }
323         r = xseg_signal(peer->xseg, p);
324         
325         XSEGLOG2(&lc, I, "Map %s closing", map->volume);
326         return req;
327
328 out_unset:
329         xseg_get_req_data(peer->xseg, req, &dummy);
330 out_put:
331         xseg_put_request(peer->xseg, req, pr->portno);
332 out_err:
333         return NULL;
334 }
335
336 static int close_map(struct peer_req *pr, struct map *map)
337 {
338         int err;
339         struct xseg_request *req;
340         struct peerd *peer = pr->peer;
341
342         map->flags |= MF_MAP_CLOSING;
343         req = __close_map(pr, map);
344         if (!req)
345                 return -1;
346         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
347         map->flags &= ~MF_MAP_CLOSING;
348         err = req->state & XS_FAILED;
349         xseg_put_request(peer->xseg, req, pr->portno);
350         if (err)
351                 return -1;
352         return 0;
353 }
354
355 /*
356 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
357                                 char *target, uint32_t targetlen, struct map **m)
358 {
359         struct mapperd *mapper = __get_mapperd(peer);
360         int r;
361         *m = find_map(mapper, target, targetlen);
362         if (*m) {
363                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
364                 if ((*m)->flags & MF_MAP_NOT_READY) {
365                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
366                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
367                         return MF_PENDING;
368                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
369                 //      return -1;
370                 // 
371                 }else {
372                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
373                         return 0;
374                 }
375         }
376         r = open_map(peer, pr, target, targetlen, 0);
377         if (r < 0)
378                 return -1; //error
379         return MF_PENDING;      
380 }
381 */
382 /*
383  * Object handling functions
384  */
385
386 struct map_node *find_object(struct map *map, uint64_t obj_index)
387 {
388         struct map_node *mn;
389         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
390         if (r < 0)
391                 return NULL;
392         return mn;
393 }
394
395 static int insert_object(struct map *map, struct map_node *mn)
396 {
397         //FIXME no find object first
398         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
399         if (r == -XHASH_ERESIZE) {
400                 unsigned long shift = xhash_grow_size_shift(map->objects);
401                 map->objects = xhash_resize(map->objects, shift, NULL);
402                 if (!map->objects)
403                         return -1;
404                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
405         }
406         return r;
407 }
408
409
410 /*
411  * map read/write functions 
412  */
413 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
414 {
415         int i;
416         //hexlify sha256 value
417         for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
418                 sprintf(mn->object+2*i, "%02x", buf[i]);
419         }
420
421         mn->object[SHA256_DIGEST_SIZE * 2] = 0;
422         mn->objectlen = SHA256_DIGEST_SIZE * 2;
423         mn->flags = MF_OBJECT_EXIST;
424 }
425
426 static inline void map_to_object(struct map_node *mn, char *buf)
427 {
428         char c = buf[0];
429         mn->flags = 0;
430         if (c)
431                 mn->flags |= MF_OBJECT_EXIST;
432         memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
433         mn->object[XSEG_MAX_TARGETLEN] = 0;
434         mn->objectlen = strlen(mn->object);
435 }
436
437 static inline void object_to_map(char* buf, struct map_node *mn)
438 {
439         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
440         memcpy(buf+1, mn->object, mn->objectlen);
441         memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
442 }
443
444 static inline void mapheader_to_map(struct map *m, char *buf)
445 {
446         uint64_t pos = 0;
447         memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
448         pos += SHA256_DIGEST_SIZE;
449         memcpy(buf + pos, &m->size, sizeof(m->size));
450         pos += sizeof(m->size);
451 }
452
453
454 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr, 
455                                 struct map *map, struct map_node *mn)
456 {
457         void *dummy;
458         struct mapperd *mapper = __get_mapperd(peer);
459         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
460                                                         mapper->mbportno, X_ALLOC);
461         if (!req){
462                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
463                                 "(Map: %s [%llu]",
464                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
465                 goto out_err;
466         }
467         int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
468         if (r < 0){
469                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
470                                 "(Map: %s [%llu]",
471                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
472                 goto out_put;
473         }
474         char *target = xseg_get_target(peer->xseg, req);
475         strncpy(target, map->volume, req->targetlen);
476         req->size = objectsize_in_map;
477         req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
478         req->op = X_WRITE;
479         char *data = xseg_get_data(peer->xseg, req);
480         object_to_map(data, mn);
481
482         r = xseg_set_req_data(peer->xseg, req, pr);
483         if (r < 0){
484                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
485                                 "(Map: %s [%llu]",
486                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
487                 goto out_put;
488         }
489         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
490         if (p == NoPort){
491                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
492                                 "(Map: %s [%llu]",
493                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
494                 goto out_unset;
495         }
496         r = xseg_signal(peer->xseg, p);
497         if (r < 0)
498                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
499
500         XSEGLOG2(&lc, I, "Writing object %s \n\t"
501                         "Map: %s [%llu]",
502                         mn->object, map->volume, (unsigned long long) mn->objectidx);
503
504         return req;
505
506 out_unset:
507         xseg_get_req_data(peer->xseg, req, &dummy);
508 out_put:
509         xseg_put_request(peer->xseg, req, pr->portno);
510 out_err:
511         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
512                         "(Map: %s [%llu]",
513                         mn->object, map->volume, (unsigned long long) mn->objectidx);
514         return NULL;
515 }
516
517 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
518 {
519         void *dummy;
520         struct peerd *peer = pr->peer;
521         struct mapperd *mapper = __get_mapperd(peer);
522         struct map_node *mn;
523         uint64_t i, pos, max_objidx = calc_map_obj(map);
524         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
525                                                         mapper->mbportno, X_ALLOC);
526         if (!req){
527                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
528                 goto out_err;
529         }
530         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 
531                                         mapheader_size + max_objidx * objectsize_in_map);
532         if (r < 0){
533                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
534                 goto out_put;
535         }
536         char *target = xseg_get_target(peer->xseg, req);
537         strncpy(target, map->volume, req->targetlen);
538         char *data = xseg_get_data(peer->xseg, req);
539         mapheader_to_map(map, data);
540         pos = mapheader_size;
541         req->op = X_WRITE;
542         req->size = req->datalen;
543         req->offset = 0;
544
545         if (map->size % block_size)
546                 max_objidx++;
547         for (i = 0; i < max_objidx; i++) {
548                 mn = find_object(map, i);
549                 if (!mn){
550                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
551                                         (unsigned long long) i, map->volume);
552                         goto out_put;
553                 }
554                 object_to_map(data+pos, mn);
555                 pos += objectsize_in_map;
556         }
557         r = xseg_set_req_data(peer->xseg, req, pr);
558         if (r < 0){
559                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
560                                 map->volume);
561                 goto out_put;
562         }
563         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
564         if (p == NoPort){
565                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
566                                 map->volume);
567                 goto out_unset;
568         }
569         r = xseg_signal(peer->xseg, p);
570         if (r < 0)
571                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
572
573         map->flags |= MF_MAP_WRITING;
574         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
575         return req;
576
577 out_unset:
578         xseg_get_req_data(peer->xseg, req, &dummy);
579 out_put:
580         xseg_put_request(peer->xseg, req, pr->portno);
581 out_err:
582         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
583         return NULL;
584 }
585
586 static int write_map(struct peer_req* pr, struct map *map)
587 {
588         int r = 0;
589         struct peerd *peer = pr->peer;
590         map->flags |= MF_MAP_WRITING;
591         struct xseg_request *req = __write_map(pr, map);
592         if (!req)
593                 return -1;
594         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
595         if (req->state & XS_FAILED)
596                 r = -1;
597         xseg_put_request(peer->xseg, req, pr->portno);
598         map->flags &= ~MF_MAP_WRITING;
599         return r;
600 }
601
602 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
603 {
604         int r;
605         xport p;
606         struct xseg_request *req;
607         struct peerd *peer = pr->peer;
608         struct mapperd *mapper = __get_mapperd(peer);
609         void *dummy;
610
611         XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
612
613         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
614         if (!req){
615                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
616                                 m->volume);
617                 goto out_fail;
618         }
619
620         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
621         if (r < 0){
622                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
623                                 m->volume);
624                 goto out_put;
625         }
626
627         char *reqtarget = xseg_get_target(peer->xseg, req);
628         if (!reqtarget)
629                 goto out_put;
630         strncpy(reqtarget, m->volume, req->targetlen);
631         req->op = X_READ;
632         req->size = block_size;
633         req->offset = 0;
634         r = xseg_set_req_data(peer->xseg, req, pr);
635         if (r < 0){
636                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
637                                 m->volume);
638                 goto out_put;
639         }
640         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
641         if (p == NoPort){ 
642                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
643                                 m->volume);
644                 goto out_unset;
645         }
646         r = xseg_signal(peer->xseg, p);
647         
648         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
649         return req;
650
651 out_unset:
652         xseg_get_req_data(peer->xseg, req, &dummy);
653 out_put:
654         xseg_put_request(peer->xseg, req, pr->portno);
655 out_fail:
656         return NULL;
657 }
658
659 static int read_map (struct map *map, char *buf)
660 {
661         char nulls[SHA256_DIGEST_SIZE];
662         memset(nulls, 0, SHA256_DIGEST_SIZE);
663
664         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
665         if (r) {
666                 XSEGLOG2(&lc, D, "Read zeros");
667                 //read error;
668                 return -1;
669         }
670         //type 1, our type, type 0 pithos map
671         int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
672         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
673         uint64_t pos;
674         uint64_t i, nr_objs;
675         struct map_node *map_node;
676         if (type) {
677                 pos = SHA256_DIGEST_SIZE;
678                 map->size = *(uint64_t *) (buf + pos);
679                 pos += sizeof(uint64_t);
680                 nr_objs = map->size / block_size;
681                 if (map->size % block_size)
682                         nr_objs++;
683                 map_node = calloc(nr_objs, sizeof(struct map_node));
684                 if (!map_node)
685                         return -1;
686
687                 for (i = 0; i < nr_objs; i++) {
688                         map_node[i].map = map;
689                         map_node[i].objectidx = i;
690                         map_node[i].waiters = 0;
691                         map_node[i].ref = 1;
692                         map_node[i].cond = st_cond_new(); //FIXME err check;
693                         map_to_object(&map_node[i], buf + pos);
694                         pos += objectsize_in_map;
695                         r = insert_object(map, &map_node[i]); //FIXME error check
696                 }
697         } else {
698                 pos = 0;
699                 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
700                 map_node = calloc(max_nr_objs, sizeof(struct map_node));
701                 if (!map_node)
702                         return -1;
703                 for (i = 0; i < max_nr_objs; i++) {
704                         if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
705                                 break;
706                         map_node[i].objectidx = i;
707                         map_node[i].map = map;
708                         map_node[i].waiters = 0;
709                         map_node[i].ref = 1;
710                         map_node[i].cond = st_cond_new(); //FIXME err check;
711                         pithosmap_to_object(&map_node[i], buf + pos);
712                         pos += SHA256_DIGEST_SIZE; 
713                         r = insert_object(map, &map_node[i]); //FIXME error check
714                 }
715                 map->size = i * block_size; 
716         }
717         print_map(map);
718         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
719         return 0;
720
721         //FIXME cleanup on error
722 }
723
724 static int load_map(struct peer_req *pr, struct map *map)
725 {
726         int r = 0;
727         struct xseg_request *req;
728         struct peerd *peer = pr->peer;
729         map->flags |= MF_MAP_LOADING;
730         req = __load_map(pr, map);
731         if (!req)
732                 return -1;
733         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
734         map->flags &= ~MF_MAP_LOADING;
735         if (req->state & XS_FAILED){
736                 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
737                 xseg_put_request(peer->xseg, req, pr->portno);
738                 return -1;
739         }
740         r = read_map(map, xseg_get_data(peer->xseg, req));
741         xseg_put_request(peer->xseg, req, pr->portno);
742         return r;
743 }
744
745 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m)
746 {
747         int r;
748         xport p;
749         struct xseg_request *req;
750         struct peerd *peer = pr->peer;
751         struct mapperd *mapper = __get_mapperd(peer);
752         void *dummy;
753
754         XSEGLOG2(&lc, I, "Opening map %s", m->volume);
755
756         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
757         if (!req){
758                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
759                                 m->volume);
760                 goto out_fail;
761         }
762
763         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
764         if (r < 0){
765                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
766                                 m->volume);
767                 goto out_put;
768         }
769
770         char *reqtarget = xseg_get_target(peer->xseg, req);
771         if (!reqtarget)
772                 goto out_put;
773         strncpy(reqtarget, m->volume, req->targetlen);
774         req->op = X_OPEN;
775         req->size = block_size;
776         req->offset = 0;
777         r = xseg_set_req_data(peer->xseg, req, pr);
778         if (r < 0){
779                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
780                                 m->volume);
781                 goto out_put;
782         }
783         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
784         if (p == NoPort){ 
785                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
786                                 m->volume);
787                 goto out_unset;
788         }
789         r = xseg_signal(peer->xseg, p);
790         
791         XSEGLOG2(&lc, I, "Map %s opening", m->volume);
792         return req;
793
794 out_unset:
795         xseg_get_req_data(peer->xseg, req, &dummy);
796 out_put:
797         xseg_put_request(peer->xseg, req, pr->portno);
798 out_fail:
799         return NULL;
800 }
801
802 static int open_map(struct peer_req *pr, struct map *map)
803 {
804         int err;
805         struct xseg_request *req;
806         struct peerd *peer = pr->peer;
807
808         map->flags |= MF_MAP_OPENING;
809         req = __open_map(pr, map);
810         if (!req)
811                 return -1;
812         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
813         map->flags &= ~MF_MAP_OPENING;
814         err = req->state & XS_FAILED;
815         xseg_put_request(peer->xseg, req, pr->portno);
816         if (err)
817                 return -1;
818         else 
819                 map->flags |= MF_MAP_EXCLUSIVE;
820         return 0;
821 }
822
823 /*
824  * copy up functions
825  */
826
827 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
828 {
829         int r = 0;
830         if (mn){
831                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
832                 if (r == -XHASH_ERESIZE) {
833                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
834                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
835                         if (!new_hashmap)
836                                 goto out;
837                         mio->copyups_nodes = new_hashmap;
838                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
839                 }
840         }
841         else {
842                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
843                 if (r == -XHASH_ERESIZE) {
844                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
845                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
846                         if (!new_hashmap)
847                                 goto out;
848                         mio->copyups_nodes = new_hashmap;
849                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
850                 }
851         }
852 out:
853         return r;
854 }
855
856 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
857 {
858         struct map_node *mn;
859         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
860         if (r < 0)
861                 return NULL;
862         return mn;
863 }
864
865 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
866 {
867         struct mapperd *mapper = __get_mapperd(peer);
868         struct mapper_io *mio = __get_mapper_io(pr);
869         struct map *map = mn->map;
870         void *dummy;
871         int r = -1, i;
872         xport p;
873
874         //struct sha256_ctx sha256ctx;
875         uint32_t newtargetlen;
876         char new_target[XSEG_MAX_TARGETLEN + 1]; 
877         unsigned char buf[SHA256_DIGEST_SIZE];  //assert sha256_digest_size(32) <= MAXTARGETLEN 
878         char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
879         strncpy(new_object, map->volume, map->volumelen);
880         sprintf(new_object + map->volumelen, "%u", mn->objectidx); //sprintf adds null termination
881         new_object[XSEG_MAX_TARGETLEN + 19] = 0;
882
883         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
884         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
885                 sprintf (new_target + 2*i, "%02x", buf[i]);
886         newtargetlen = SHA256_DIGEST_SIZE  * 2;
887
888         if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE)) 
889                 goto copyup_zeroblock;
890
891         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
892                                                         mapper->bportno, X_ALLOC);
893         if (!req){
894                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
895                 goto out_err;
896         }
897         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
898                                 sizeof(struct xseg_request_copy));
899         if (r < 0){
900                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
901                 goto out_put;
902         }
903
904         char *target = xseg_get_target(peer->xseg, req);
905         strncpy(target, new_target, req->targetlen);
906
907         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
908         strncpy(xcopy->target, mn->object, mn->objectlen);
909         xcopy->targetlen = mn->objectlen;
910
911         req->offset = 0;
912         req->size = block_size;
913         req->op = X_COPY;
914         r = xseg_set_req_data(peer->xseg, req, pr);
915         if (r<0){
916                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
917                 goto out_put;
918         }
919         r = __set_copyup_node(mio, req, mn);
920         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
921         if (p == NoPort) {
922                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
923                 goto out_unset;
924         }
925         xseg_signal(peer->xseg, p);
926 //      mio->copyups++;
927
928         mn->flags |= MF_OBJECT_COPYING;
929         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
930         return req;
931
932 out_unset:
933         r = __set_copyup_node(mio, req, NULL);
934         xseg_get_req_data(peer->xseg, req, &dummy);
935 out_put:
936         xseg_put_request(peer->xseg, req, pr->portno);
937 out_err:
938         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
939         return NULL;
940
941 copyup_zeroblock:
942         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
943                         "Proceeding in writing the new object in map");
944         /* construct a tmp map_node for writing purposes */
945         struct map_node newmn = *mn;
946         newmn.flags = MF_OBJECT_EXIST;
947         strncpy(newmn.object, new_target, newtargetlen);
948         newmn.object[newtargetlen] = 0;
949         newmn.objectlen = newtargetlen;
950         newmn.objectidx = mn->objectidx; 
951         req = object_write(peer, pr, map, &newmn);
952         r = __set_copyup_node(mio, req, mn);
953         if (!req){
954                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
955                                 "\n\t of map %s [%llu]",
956                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
957                 return NULL;
958         }
959         mn->flags |= MF_OBJECT_WRITING;
960         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
961         return req;
962 }
963
964 static struct xseg_request * delete_object(struct peer_req *pr, struct map_node *mn)
965 {
966         void *dummy;
967         struct peerd *peer = pr->peer;
968         struct mapperd *mapper = __get_mapperd(peer);
969         struct mapper_io *mio = __get_mapper_io(pr);
970         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
971                                                         mapper->bportno, X_ALLOC);
972         XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
973         if (!req){
974                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
975                 goto out_err;
976         }
977         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
978         if (r < 0){
979                 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
980                 goto out_put;
981         }
982         char *target = xseg_get_target(peer->xseg, req);
983         strncpy(target, mn->object, req->targetlen);
984         req->op = X_DELETE;
985         req->size = req->datalen;
986         req->offset = 0;
987         r = xseg_set_req_data(peer->xseg, req, pr);
988         if (r < 0){
989                 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
990                 goto out_put;
991         }
992         __set_copyup_node(mio, req, mn);
993         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
994         if (p == NoPort){
995                 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
996                 goto out_unset;
997         }
998         r = xseg_signal(peer->xseg, p);
999         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1000         return req;
1001
1002 out_unset:
1003         xseg_get_req_data(peer->xseg, req, &dummy);
1004 out_put:
1005         xseg_put_request(peer->xseg, req, pr->portno);
1006 out_err:
1007         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1008         return NULL;
1009 }
1010
1011 static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
1012 {
1013         void *dummy;
1014         struct peerd *peer = pr->peer;
1015         struct mapperd *mapper = __get_mapperd(peer);
1016         struct mapper_io *mio = __get_mapper_io(pr);
1017         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1018                                                         mapper->mbportno, X_ALLOC);
1019         XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1020         if (!req){
1021                 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1022                 goto out_err;
1023         }
1024         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1025         if (r < 0){
1026                 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1027                 goto out_put;
1028         }
1029         char *target = xseg_get_target(peer->xseg, req);
1030         strncpy(target, map->volume, req->targetlen);
1031         req->op = X_DELETE;
1032         req->size = req->datalen;
1033         req->offset = 0;
1034         r = xseg_set_req_data(peer->xseg, req, pr);
1035         if (r < 0){
1036                 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1037                 goto out_put;
1038         }
1039         __set_copyup_node(mio, req, NULL);
1040         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1041         if (p == NoPort){
1042                 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1043                 goto out_unset;
1044         }
1045         r = xseg_signal(peer->xseg, p);
1046         map->flags |= MF_MAP_DELETING;
1047         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1048         return req;
1049
1050 out_unset:
1051         xseg_get_req_data(peer->xseg, req, &dummy);
1052 out_put:
1053         xseg_put_request(peer->xseg, req, pr->portno);
1054 out_err:
1055         XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1056         return  NULL;
1057 }
1058
1059
1060 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1061 {
1062         struct map_node *mn = find_object(map, index);
1063         if (mn)
1064                 mn->ref++;
1065         return mn;
1066 }
1067
1068 static inline void put_mapnode(struct map_node *mn)
1069 {
1070         mn->ref--;
1071         if (!mn->ref){
1072                 //clean up mn
1073                 st_cond_destroy(mn->cond);
1074         }
1075 }
1076
1077 static inline void __get_map(struct map *map)
1078 {
1079         map->ref++;
1080 }
1081
1082 static inline void put_map(struct map *map)
1083 {
1084         struct map_node *mn;
1085         map->ref--;
1086         if (!map->ref){
1087                 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1088                 //clean up map
1089                 uint64_t i;
1090                 for (i = 0; i < calc_map_obj(map); i++) {
1091                         mn = get_mapnode(map, i);
1092                         if (mn) {
1093                                 //make sure all pending operations on all objects are completed
1094                                 if (mn->flags & MF_OBJECT_NOT_READY){
1095                                         //this should never happen...
1096                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1097                                 }
1098                                 mn->flags &= MF_OBJECT_DESTROYED;
1099                                 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1100                                 put_mapnode(mn); //matcing get_mapnode;
1101                                 //assert mn->ref == 0;
1102                         }
1103                 }
1104                 mn = find_object(map, 0);
1105                 if (mn)
1106                         free(mn);
1107                 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1108                 free(map);
1109         }
1110 }
1111
1112 static struct map * create_map(struct mapperd *mapper, char *name, uint32_t namelen)
1113 {
1114         int r;
1115         struct map *m = malloc(sizeof(struct map));
1116         if (!m){
1117                 XSEGLOG2(&lc, E, "Cannot allocate map ");
1118                 goto out_err;
1119         }
1120         m->size = -1;
1121         strncpy(m->volume, name, namelen);
1122         m->volume[namelen] = 0;
1123         m->volumelen = namelen;
1124         m->flags = 0;
1125         m->objects = xhash_new(3, INTEGER); 
1126         if (!m->objects){
1127                 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1128                                 m->volume);
1129                 goto out_map;
1130         }
1131         m->ref = 1;
1132         m->waiters = 0;
1133         m->cond = st_cond_new(); //FIXME err check;
1134         r = insert_map(mapper, m);
1135         if (r < 0){
1136                 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1137                 goto out_hash;
1138         }
1139
1140         return m;
1141
1142 out_hash:
1143         xhash_free(m->objects);
1144 out_map:
1145         XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1146         free(m);
1147 out_err:
1148         return NULL;
1149 }
1150
1151
1152
1153 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1154 {
1155         struct peerd *peer = pr->peer;
1156         struct mapperd *mapper = __get_mapperd(peer);
1157         (void)mapper;
1158         struct mapper_io *mio = __get_mapper_io(pr);
1159         struct map_node *mn = __get_copyup_node(mio, req);
1160
1161         mio->del_pending--;
1162         if (req->state & XS_FAILED){
1163                 mio->err = 1;
1164         }
1165         signal_mapnode(mn);
1166         xseg_put_request(peer->xseg, req, pr->portno);
1167         signal_pr(pr);
1168 }
1169
1170 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1171 {
1172         struct peerd *peer = pr->peer;
1173         struct mapperd *mapper = __get_mapperd(peer);
1174         (void)mapper;
1175         struct mapper_io *mio = __get_mapper_io(pr);
1176         struct map_node *mn = __get_copyup_node(mio, req);
1177         if (!mn){
1178                 XSEGLOG2(&lc, E, "Cannot get map node");
1179                 goto out_err;
1180         }
1181         __set_copyup_node(mio, req, NULL);
1182
1183         if (req->state & XS_FAILED){
1184                 XSEGLOG2(&lc, E, "Req failed");
1185                 mn->flags &= ~MF_OBJECT_COPYING;
1186                 mn->flags &= ~MF_OBJECT_WRITING;
1187                 goto out_err;
1188         }
1189         if (req->op == X_WRITE) {
1190                 char *target = xseg_get_target(peer->xseg, req);
1191                 (void)target;
1192                 //printf("handle object write replyi\n");
1193                 __set_copyup_node(mio, req, NULL);
1194                 //assert mn->flags & MF_OBJECT_WRITING
1195                 mn->flags &= ~MF_OBJECT_WRITING;
1196                 
1197                 struct map_node tmp;
1198                 char *data = xseg_get_data(peer->xseg, req);
1199                 map_to_object(&tmp, data);
1200                 mn->flags |= MF_OBJECT_EXIST;
1201                 if (mn->flags != MF_OBJECT_EXIST){
1202                         XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1203                         goto out_err;
1204                 }
1205                 //assert mn->flags & MF_OBJECT_EXIST
1206                 strncpy(mn->object, tmp.object, tmp.objectlen);
1207                 mn->object[tmp.objectlen] = 0;
1208                 mn->objectlen = tmp.objectlen;
1209                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1210                 mio->copyups--;
1211                 signal_mapnode(mn);
1212         } else if (req->op == X_COPY) {
1213         //      issue write_object;
1214                 mn->flags &= ~MF_OBJECT_COPYING;
1215                 struct map *map = mn->map;
1216                 if (!map){
1217                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1218                         goto out_err;
1219                 }
1220
1221                 /* construct a tmp map_node for writing purposes */
1222                 char *target = xseg_get_target(peer->xseg, req);
1223                 struct map_node newmn = *mn;
1224                 newmn.flags = MF_OBJECT_EXIST;
1225                 strncpy(newmn.object, target, req->targetlen);
1226                 newmn.object[req->targetlen] = 0;
1227                 newmn.objectlen = req->targetlen;
1228                 newmn.objectidx = mn->objectidx; 
1229                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1230                 if (!xreq){
1231                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1232                                         "\n\t of map %s [%llu]",
1233                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1234                         goto out_err;
1235                 }
1236                 mn->flags |= MF_OBJECT_WRITING;
1237                 __set_copyup_node (mio, xreq, mn);
1238
1239                 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1240         } else {
1241                 //wtf??
1242                 ;
1243         }
1244
1245 out:
1246         xseg_put_request(peer->xseg, req, pr->portno);
1247         return;
1248
1249 out_err:
1250         mio->copyups--;
1251         XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1252         mio->err = 1;
1253         if (mn)
1254                 signal_mapnode(mn);
1255         goto out;
1256
1257 }
1258
1259 struct r2o {
1260         struct map_node *mn;
1261         uint64_t offset;
1262         uint64_t size;
1263 };
1264
1265 static int req2objs(struct peer_req *pr, struct map *map, int write)
1266 {
1267         int r = 0;
1268         struct peerd *peer = pr->peer;
1269         struct mapper_io *mio = __get_mapper_io(pr);
1270         char *target = xseg_get_target(peer->xseg, pr->req);
1271         uint32_t nr_objs = calc_nr_obj(pr->req);
1272         uint64_t size = sizeof(struct xseg_reply_map) + 
1273                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1274         uint32_t idx, i, ready;
1275         uint64_t rem_size, obj_index, obj_offset, obj_size; 
1276         struct map_node *mn;
1277         mio->copyups = 0;
1278         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1279
1280         /* get map_nodes of request */
1281         struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1282         if (!mns){
1283                 XSEGLOG2(&lc, E, "Cannot allocate mns");
1284                 return -1;
1285         }
1286         idx = 0;
1287         rem_size = pr->req->size;
1288         obj_index = pr->req->offset / block_size;
1289         obj_offset = pr->req->offset & (block_size -1); //modulo
1290         obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1291         mn = get_mapnode(map, obj_index);
1292         if (!mn) {
1293                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1294                 r = -1;
1295                 goto out;
1296         }
1297         mns[idx].mn = mn;
1298         mns[idx].offset = obj_offset;
1299         mns[idx].size = obj_size;
1300         rem_size -= obj_size;
1301         while (rem_size > 0) {
1302                 idx++;
1303                 obj_index++;
1304                 obj_offset = 0;
1305                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
1306                 rem_size -= obj_size;
1307                 mn = get_mapnode(map, obj_index);
1308                 if (!mn) {
1309                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1310                         r = -1;
1311                         goto out;
1312                 }
1313                 mns[idx].mn = mn;
1314                 mns[idx].offset = obj_offset;
1315                 mns[idx].size = obj_size;
1316         }
1317         if (write) {
1318                 ready = 0;
1319                 int can_wait = 0;
1320                 mio->cb=copyup_cb;
1321                 while (ready < (idx + 1)){
1322                         ready = 0;
1323                         for (i = 0; i < (idx+1); i++) {
1324                                 mn = mns[i].mn;
1325                                 //do copyups
1326                                 if (mn->flags & MF_OBJECT_NOT_READY) {
1327                                         if (can_wait){
1328                                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1329                                                 if (mn->flags & MF_OBJECT_DELETED){
1330                                                         mio->err = 1;
1331                                                 }
1332                                                 if (mio->err){
1333                                                         XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1334                                                         if (!mio->copyups){
1335                                                                 r = -1;
1336                                                                 goto out;
1337                                                         }
1338                                                 }
1339                                         }
1340                                 }
1341                                 else if (!(mn->flags & MF_OBJECT_EXIST)) {
1342                                         //calc new_target, copy up object
1343                                         if (copyup_object(peer, mn, pr) == NULL){
1344                                                 XSEGLOG2(&lc, E, "Error in copy up object");
1345                                         } else {
1346                                                 mio->copyups++;
1347                                         }
1348                                 } else {
1349                                         ready++;
1350                                 }
1351                         }
1352                         can_wait = 1;
1353                 }
1354                 /*
1355 pending_copyups:
1356                 while(mio->copyups > 0){
1357                         mio->cb = copyup_cb;
1358                         wait_on_pr(pr, 0);
1359                         ta--;
1360                         st_cond_wait(pr->cond);
1361                 }
1362                 */
1363         }
1364
1365         if (mio->err){
1366                 r = -1;
1367                 XSEGLOG2(&lc, E, "Mio->err");
1368                 goto out;
1369         }
1370
1371         /* resize request to fit reply */
1372         char buf[XSEG_MAX_TARGETLEN];
1373         strncpy(buf, target, pr->req->targetlen);
1374         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1375         if (r < 0) {
1376                 XSEGLOG2(&lc, E, "Cannot resize request");
1377                 goto out;
1378         }
1379         target = xseg_get_target(peer->xseg, pr->req);
1380         strncpy(target, buf, pr->req->targetlen);
1381
1382         /* structure reply */
1383         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1384         reply->cnt = nr_objs;
1385         for (i = 0; i < (idx+1); i++) {
1386                 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1387                 reply->segs[i].targetlen = mns[i].mn->objectlen;
1388                 reply->segs[i].offset = mns[i].offset;
1389                 reply->segs[i].size = mns[i].size;
1390         }
1391 out:
1392         for (i = 0; i < idx; i++) {
1393                 put_mapnode(mns[i].mn);
1394         }
1395         free(mns);
1396         return r;
1397 }
1398
1399 static int do_dropcache(struct peer_req *pr, struct map *map)
1400 {
1401         struct map_node *mn;
1402         struct peerd *peer = pr->peer;
1403         struct mapperd *mapper = __get_mapperd(peer);
1404         uint64_t i;
1405         XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1406         map->flags |= MF_MAP_DROPPING_CACHE;
1407         for (i = 0; i < calc_map_obj(map); i++) {
1408                 mn = get_mapnode(map, i);
1409                 if (mn) {
1410                         if (!(mn->flags & MF_OBJECT_DESTROYED)){
1411                                 //make sure all pending operations on all objects are completed
1412                                 if (mn->flags & MF_OBJECT_NOT_READY){
1413                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1414                                 }
1415                                 mn->flags &= MF_OBJECT_DESTROYED;
1416                         }
1417                         put_mapnode(mn);
1418                 }
1419         }
1420         map->flags &= ~MF_MAP_DROPPING_CACHE;
1421         map->flags |= MF_MAP_DESTROYED;
1422         remove_map(mapper, map);
1423         XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1424         put_map(map);   // put map here to destroy it (matches m->ref = 1 on map create)
1425         return 0;
1426 }
1427
1428 static int do_info(struct peer_req *pr, struct map *map)
1429 {
1430         struct peerd *peer = pr->peer;
1431         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1432         xinfo->size = map->size;
1433         return 0;
1434 }
1435
1436
1437 static int do_close(struct peer_req *pr, struct map *map)
1438 {
1439 //      struct peerd *peer = pr->peer;
1440 //      struct xseg_request *req;
1441         if (map->flags & MF_MAP_EXCLUSIVE) 
1442                 close_map(pr, map);
1443         return do_dropcache(pr, map);
1444 }
1445
1446 static int do_destroy(struct peer_req *pr, struct map *map)
1447 {
1448         uint64_t i;
1449         struct peerd *peer = pr->peer;
1450         struct mapper_io *mio = __get_mapper_io(pr);
1451         struct map_node *mn;
1452         struct xseg_request *req;
1453         
1454         XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1455         map->flags |= MF_MAP_DELETING;
1456         req = delete_map(pr, map);
1457         if (!req)
1458                 return -1;
1459         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1460         if (req->state & XS_FAILED){
1461                 xseg_put_request(peer->xseg, req, pr->portno);
1462                 map->flags &= ~MF_MAP_DELETING;
1463                 return -1;
1464         }
1465         xseg_put_request(peer->xseg, req, pr->portno);
1466         //FIXME
1467         uint64_t nr_obj = calc_map_obj(map);
1468         uint64_t deleted = 0;
1469         while (deleted < nr_obj){ 
1470                 deleted = 0;
1471                 for (i = 0; i < nr_obj; i++){
1472                         mn = get_mapnode(map, i);
1473                         if (mn) {
1474                                 if (!(mn->flags & MF_OBJECT_DESTROYED)){
1475                                         if (mn->flags & MF_OBJECT_EXIST){
1476                                                 //make sure all pending operations on all objects are completed
1477                                                 if (mn->flags & MF_OBJECT_NOT_READY){
1478                                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1479                                                 }
1480                                                 req = delete_object(pr, mn);
1481                                                 if (!req)
1482                                                         if (mio->del_pending){
1483                                                                 goto wait_pending;
1484                                                         } else {
1485                                                                 continue;
1486                                                         }
1487                                                 else {
1488                                                         mio->del_pending++;
1489                                                 }
1490                                         }
1491                                         mn->flags &= MF_OBJECT_DESTROYED;
1492                                 }
1493                                 put_mapnode(mn);
1494                         }
1495                         deleted++;
1496                 }
1497 wait_pending:
1498                 mio->cb = deletion_cb;
1499                 wait_on_pr(pr, mio->del_pending > 0);
1500         }
1501         mio->cb = NULL;
1502         map->flags &= ~MF_MAP_DELETING;
1503         XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1504         return do_close(pr, map);
1505 }
1506
1507 static int do_mapr(struct peer_req *pr, struct map *map)
1508 {
1509         struct peerd *peer = pr->peer;
1510         int r = req2objs(pr, map, 0);
1511         if  (r < 0){
1512                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1513                                 map->volume, 
1514                                 (unsigned long long) pr->req->offset, 
1515                                 (unsigned long long) (pr->req->offset + pr->req->size));
1516                 return -1;
1517         }
1518         XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1519                         map->volume, 
1520                         (unsigned long long) pr->req->offset, 
1521                         (unsigned long long) (pr->req->offset + pr->req->size));
1522         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1523                         (unsigned long long) pr->req->offset,
1524                         (unsigned long long) pr->req->size);
1525         char buf[XSEG_MAX_TARGETLEN+1];
1526         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1527         int i;
1528         for (i = 0; i < reply->cnt; i++) {
1529                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1530                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1531                 buf[reply->segs[i].targetlen] = 0;
1532                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1533                                 (unsigned long long) reply->segs[i].offset,
1534                                 (unsigned long long) reply->segs[i].size);
1535         }
1536         return 0;
1537 }
1538
1539 static int do_mapw(struct peer_req *pr, struct map *map)
1540 {
1541         struct peerd *peer = pr->peer;
1542         int r = req2objs(pr, map, 1);
1543         if  (r < 0){
1544                 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1545                                 map->volume, 
1546                                 (unsigned long long) pr->req->offset, 
1547                                 (unsigned long long) (pr->req->offset + pr->req->size));
1548                 return -1;
1549         }
1550         XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1551                         map->volume, 
1552                         (unsigned long long) pr->req->offset, 
1553                         (unsigned long long) (pr->req->offset + pr->req->size));
1554         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1555                         (unsigned long long) pr->req->offset,
1556                         (unsigned long long) pr->req->size);
1557         char buf[XSEG_MAX_TARGETLEN+1];
1558         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1559         int i;
1560         for (i = 0; i < reply->cnt; i++) {
1561                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1562                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1563                 buf[reply->segs[i].targetlen] = 0;
1564                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1565                                 (unsigned long long) reply->segs[i].offset,
1566                                 (unsigned long long) reply->segs[i].size);
1567         }
1568         return 0;
1569 }
1570
1571 //here map is the parent map
1572 static int do_clone(struct peer_req *pr, struct map *map)
1573 {
1574         /*
1575         FIXME check if clone map exists
1576         clonemap = get_map(pr, target, targetlen, MF_LOAD);
1577         if (clonemap)
1578                 do_dropcache(pr, clonemap); // drop map here, rely on get_map_function to drop
1579                                         //      cache on non-exclusive opens or declare a NO_CACHE flag ?
1580                 return -1;
1581         */
1582
1583         int r;
1584         char buf[XSEG_MAX_TARGETLEN];
1585         struct peerd *peer = pr->peer;
1586         struct mapperd *mapper = __get_mapperd(peer);
1587         char *target = xseg_get_target(peer->xseg, pr->req);
1588         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1589         XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1590         struct map *clonemap = create_map(mapper, target, pr->req->targetlen);
1591         if (!clonemap) 
1592                 return -1;
1593
1594         if (xclone->size == -1)
1595                 clonemap->size = map->size;
1596         else
1597                 clonemap->size = xclone->size;
1598         if (clonemap->size < map->size){
1599                 target = xseg_get_target(peer->xseg, pr->req);
1600                 strncpy(buf, target, pr->req->targetlen);
1601                 buf[pr->req->targetlen] = 0;
1602                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1603                                 "\n\t for requested clone %s",
1604                                 (unsigned long long) xclone->size,
1605                                 (unsigned long long) map->size, buf);
1606                 goto out_err;
1607         }
1608         
1609         //alloc and init map_nodes
1610         unsigned long c = clonemap->size/block_size + 1;
1611         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1612         if (!map_nodes){
1613                 goto out_err;
1614         }
1615         int i;
1616         for (i = 0; i < clonemap->size/block_size + 1; i++) {
1617                 struct map_node *mn = get_mapnode(map, i);
1618                 if (mn) {
1619                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1620                         map_nodes[i].objectlen = mn->objectlen;
1621                         put_mapnode(mn);
1622                 } else {
1623                         strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
1624                         map_nodes[i].objectlen = strlen(zero_block);
1625                 }
1626                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1627                 map_nodes[i].flags = 0;
1628                 map_nodes[i].objectidx = i;
1629                 map_nodes[i].map = clonemap;
1630                 map_nodes[i].ref = 1;
1631                 map_nodes[i].waiters = 0;
1632                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1633                 r = insert_object(clonemap, &map_nodes[i]);
1634                 if (r < 0){
1635                         XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1636                         goto out_err;
1637                 }
1638         }
1639         r = write_map(pr, clonemap);
1640         if (r < 0){
1641                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1642                 goto out_err;
1643         }
1644         return 0;
1645
1646 out_err:
1647         put_map(clonemap);
1648         return -1;
1649 }
1650
1651 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1652 {
1653         int r, opened = 0;
1654 retry_open:
1655         if (flags & MF_EXCLUSIVE){
1656                 r = open_map(pr, map);
1657                 if (r < 0) {
1658                         if (flags & MF_FORCE){
1659
1660                                 goto retry_open;
1661                         }
1662                 } else {
1663                         opened = 1;
1664                 }
1665         }
1666         r = load_map(pr, map);
1667         if (r < 0 && opened){
1668                 close_map(pr, map);
1669         }
1670         return r;
1671 }
1672
1673 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1674 {
1675         int r;
1676         struct peerd *peer = pr->peer;
1677         struct mapperd *mapper = __get_mapperd(peer);
1678         struct map *map = find_map(mapper, name, namelen);
1679         if (!map){
1680                 if (flags & MF_LOAD){
1681                         map = create_map(mapper, name, namelen);
1682                         if (!map)
1683                                 return NULL;
1684                         r = open_load_map(pr, map, flags);
1685                         if (r < 0){
1686                                 do_dropcache(pr, map);
1687                                 return NULL;
1688                         }
1689                 } else {
1690                         return NULL;
1691                 }
1692         } else if (map->flags & MF_MAP_DESTROYED){
1693                 return NULL;
1694         }
1695         __get_map(map);
1696         return map;
1697
1698 }
1699
1700 static int map_action(int (action)(struct peer_req *pr, struct map *map),
1701                 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1702 {
1703         //struct peerd *peer = pr->peer;
1704         struct map *map;
1705 start:
1706         map = get_map(pr, name, namelen, flags);
1707         if (!map)
1708                 return -1;
1709         if (map->flags & MF_MAP_NOT_READY){
1710                 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1711                 put_map(map);
1712                 goto start;
1713         }
1714         int r = action(pr, map);
1715         //always drop cache if map not read exclusively
1716         if (!(map->flags & MF_MAP_EXCLUSIVE))
1717                 do_dropcache(pr, map);
1718         //maybe capture ref before and compare here?
1719         if (map->ref > 1){
1720                 signal_map(map);
1721         }
1722         put_map(map);
1723         return r;
1724 }
1725
1726 void * handle_info(struct peer_req *pr)
1727 {
1728         struct peerd *peer = pr->peer;
1729         char *target = xseg_get_target(peer->xseg, pr->req);
1730         int r = map_action(do_info, pr, target, pr->req->targetlen, MF_LOAD);
1731         if (r < 0)
1732                 fail(peer, pr);
1733         else
1734                 complete(peer, pr);
1735         ta--;
1736         return NULL;
1737 }
1738
1739 void * handle_clone(struct peer_req *pr)
1740 {
1741         int r;
1742         struct peerd *peer = pr->peer;
1743         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1744         if (!xclone) {
1745                 r = -1;
1746                 goto out;
1747         }
1748         if (xclone->targetlen){
1749                 r = map_action(do_clone, pr, xclone->target, xclone->targetlen, MF_LOAD);
1750         } else {
1751                 if (!xclone->size){
1752                         r = -1;
1753                 } else {
1754                         struct map *map;
1755                         char *target = xseg_get_target(peer->xseg, pr->req);
1756                         XSEGLOG2(&lc, I, "Creating volume");
1757                         map = get_map(pr, target, pr->req->targetlen, MF_LOAD);
1758                         if (map){
1759                                 XSEGLOG2(&lc, E, "Volume %s exists", map->volume);
1760                                 if (map->ref <= 2) //initial one + one ref from __get_map
1761                                         do_dropcache(pr, map); //we are the only ones usining this map. Drop the cache. 
1762                                 put_map(map); //matches get_map
1763                                 r = -1;
1764                                 goto out;
1765                         }
1766                         //create a new empty map of size
1767                         map = create_map(mapper, target, pr->req->targetlen);
1768                         if (!map){
1769                                 r = -1;
1770                                 goto out;
1771                         }
1772                         map->size = xclone->size;
1773                         //populate_map with zero objects;
1774                         uint64_t nr_objs = xclone->size / block_size;
1775                         if (xclone->size % block_size)
1776                                 nr_objs++;
1777                                 
1778                         struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1779                         if (!map_nodes){
1780                                 do_dropcache(pr, map); //Since we just created the map, dropping cache should be sufficient.
1781                                 r = -1;
1782                                 goto out;
1783                         }
1784                         uint64_t i;
1785                         for (i = 0; i < nr_objs; i++) {
1786                                 strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
1787                                 map_nodes[i].objectlen = strlen(zero_block);
1788                                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1789                                 map_nodes[i].flags = 0;
1790                                 map_nodes[i].objectidx = i;
1791                                 map_nodes[i].map = map;
1792                                 map_nodes[i].ref = 1;
1793                                 map_nodes[i].waiters = 0;
1794                                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1795                                 r = insert_object(map, &map_nodes[i]);
1796                                 if (r < 0){
1797                                         do_dropcache(pr, map);
1798                                         r = -1;
1799                                         goto out;
1800                                 }
1801                         }
1802                         r = write_map(pr, map);
1803                         if (r < 0){
1804                                 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1805                                 do_dropcache(pr, map);
1806                                 goto out;
1807                         }
1808                         XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1809                         r = 0;
1810                         do_dropcache(pr, map); //drop cache here for consistency
1811                 }
1812         }
1813 out:
1814         if (r < 0)
1815                 fail(peer, pr);
1816         else
1817                 complete(peer, pr);
1818         ta--;
1819         return NULL;
1820 }
1821
1822 void * handle_mapr(struct peer_req *pr)
1823 {
1824         struct peerd *peer = pr->peer;
1825         char *target = xseg_get_target(peer->xseg, pr->req);
1826         int r = map_action(do_mapr, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE);
1827         if (r < 0)
1828                 fail(peer, pr);
1829         else
1830                 complete(peer, pr);
1831         ta--;
1832         return NULL;
1833 }
1834
1835 void * handle_mapw(struct peer_req *pr)
1836 {
1837         struct peerd *peer = pr->peer;
1838         char *target = xseg_get_target(peer->xseg, pr->req);
1839         int r = map_action(do_mapw, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1840         if (r < 0)
1841                 fail(peer, pr);
1842         else
1843                 complete(peer, pr);
1844         XSEGLOG2(&lc, D, "Ta: %d", ta);
1845         ta--;
1846         return NULL;
1847 }
1848
1849 void * handle_destroy(struct peer_req *pr)
1850 {
1851         struct peerd *peer = pr->peer;
1852         char *target = xseg_get_target(peer->xseg, pr->req);
1853         int r = map_action(do_destroy, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1854         if (r < 0)
1855                 fail(peer, pr);
1856         else
1857                 complete(peer, pr);
1858         ta--;
1859         return NULL;
1860 }
1861
1862 void * handle_close(struct peer_req *pr)
1863 {
1864         struct peerd *peer = pr->peer;
1865         char *target = xseg_get_target(peer->xseg, pr->req);
1866         //here we do not want to load
1867         int r = map_action(do_close, pr, target, pr->req->targetlen, MF_EXCLUSIVE|MF_FORCE);
1868         if (r < 0)
1869                 fail(peer, pr);
1870         else
1871                 complete(peer, pr);
1872         ta--;
1873         return NULL;
1874 }
1875
1876 int dispatch_accepted(struct peerd *peer, struct peer_req *pr, 
1877                         struct xseg_request *req)
1878 {
1879         //struct mapperd *mapper = __get_mapperd(peer);
1880         struct mapper_io *mio = __get_mapper_io(pr);
1881         void *(*action)(struct peer_req *) = NULL;
1882
1883         mio->state = ACCEPTED;
1884         mio->err = 0;
1885         mio->cb = NULL;
1886         switch (pr->req->op) {
1887                 /* primary xseg operations of mapper */
1888                 case X_CLONE: action = handle_clone; break;
1889                 case X_MAPR: action = handle_mapr; break;
1890                 case X_MAPW: action = handle_mapw; break;
1891 //              case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1892                 case X_INFO: action = handle_info; break;
1893                 case X_DELETE: action = handle_destroy; break;
1894                 case X_CLOSE: action = handle_close; break;
1895                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1896         }
1897         if (action){
1898                 ta++;
1899                 mio->active = 1;
1900                 st_thread_create(action, pr, 0, 0);
1901         }
1902         return 0;
1903
1904 }
1905
1906 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1907                 enum dispatch_reason reason)
1908 {
1909         struct mapperd *mapper = __get_mapperd(peer);
1910         (void) mapper;
1911         struct mapper_io *mio = __get_mapper_io(pr);
1912         (void) mio;
1913
1914
1915         if (reason == dispatch_accept)
1916                 dispatch_accepted(peer, pr, req);
1917         else {
1918                 if (mio->cb){
1919                         mio->cb(pr, req);
1920                 } else { 
1921                         signal_pr(pr);
1922                 }
1923         }
1924         return 0;
1925 }
1926
1927 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1928 {
1929         int i;
1930         unsigned char buf[SHA256_DIGEST_SIZE];
1931         unsigned char *zero;
1932
1933         gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1934
1935         /* Version check should be the very first call because it
1936           makes sure that important subsystems are intialized. */
1937         gcry_check_version (NULL);
1938      
1939         /* Disable secure memory.  */
1940         gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1941      
1942         /* Tell Libgcrypt that initialization has completed. */
1943         gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1944
1945         /* calculate out magic sha hash value */
1946         gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1947
1948         /* calculate zero block */
1949         //FIXME check hash value
1950         zero = malloc(block_size);
1951         memset(zero, 0, block_size);
1952         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1953         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1954                 sprintf(zero_block + 2*i, "%02x", buf[i]);
1955         printf("%s \n", zero_block);
1956         free(zero);
1957
1958         //FIXME error checks
1959         struct mapperd *mapperd = malloc(sizeof(struct mapperd));
1960         peer->priv = mapperd;
1961         mapper = mapperd;
1962         mapper->hashmaps = xhash_new(3, STRING);
1963         
1964         for (i = 0; i < peer->nr_ops; i++) {
1965                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1966                 mio->copyups_nodes = xhash_new(3, INTEGER);
1967                 mio->copyups = 0;
1968                 mio->err = 0;
1969                 mio->active = 0;
1970                 peer->peer_reqs[i].priv = mio;
1971         }
1972
1973         for (i = 0; i < argc; i++) {
1974                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1975                         mapper->bportno = atoi(argv[i+1]);
1976                         i += 1;
1977                         continue;
1978                 }
1979                 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1980                         mapper->mbportno = atoi(argv[i+1]);
1981                         i += 1;
1982                         continue;
1983                 }
1984                 /* enforce only one thread */
1985                 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1986                         int t = atoi(argv[i+1]);
1987                         if (t != 1) {
1988                                 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1989                                 return -1;
1990                         }
1991                         i += 1;
1992                         continue;
1993                 }
1994         }
1995
1996         const struct sched_param param = { .sched_priority = 99 };
1997         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
1998
1999
2000 //      test_map(peer);
2001
2002         return 0;
2003 }
2004
2005 void print_obj(struct map_node *mn)
2006 {
2007         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2008                         (unsigned long long) mn->objectidx, mn->object, 
2009                         (unsigned int) mn->objectlen, 
2010                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2011 }
2012
2013 void print_map(struct map *m)
2014 {
2015         uint64_t nr_objs = m->size/block_size;
2016         if (m->size % block_size)
2017                 nr_objs++;
2018         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n", 
2019                         m->volume, m->volumelen, 
2020                         (unsigned long long) m->size, 
2021                         (unsigned long long) nr_objs);
2022         uint64_t i;
2023         struct map_node *mn;
2024         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2025                 return;
2026         for (i = 0; i < nr_objs; i++) {
2027                 mn = find_object(m, i);
2028                 if (!mn){
2029                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
2030                         continue;
2031                 }
2032                 print_obj(mn);
2033         }
2034 }
2035
2036 /*
2037 void test_map(struct peerd *peer)
2038 {
2039         int i,j, ret;
2040         //struct sha256_ctx sha256ctx;
2041         unsigned char buf[SHA256_DIGEST_SIZE];
2042         char buf_new[XSEG_MAX_TARGETLEN + 20];
2043         struct map *m = malloc(sizeof(struct map));
2044         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2045         m->volume[XSEG_MAX_TARGETLEN] = 0;
2046         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2047         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2048         m->volumelen = XSEG_MAX_TARGETLEN;
2049         m->size = 100*block_size;
2050         m->objects = xhash_new(3, INTEGER);
2051         struct map_node *map_node = calloc(100, sizeof(struct map_node));
2052         for (i = 0; i < 100; i++) {
2053                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2054                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2055                 
2056                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2057                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2058                 }
2059                 map_node[i].objectidx = i;
2060                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2061                 map_node[i].flags = MF_OBJECT_EXIST;
2062                 ret = insert_object(m, &map_node[i]);
2063         }
2064
2065         char *data = malloc(block_size);
2066         mapheader_to_map(m, data);
2067         uint64_t pos = mapheader_size;
2068
2069         for (i = 0; i < 100; i++) {
2070                 map_node = find_object(m, i);
2071                 if (!map_node){
2072                         printf("no object node %d \n", i);
2073                         exit(1);
2074                 }
2075                 object_to_map(data+pos, map_node);
2076                 pos += objectsize_in_map;
2077         }
2078 //      print_map(m);
2079
2080         struct map *m2 = malloc(sizeof(struct map));
2081         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2082         m->volume[XSEG_MAX_TARGETLEN] = 0;
2083         m->volumelen = XSEG_MAX_TARGETLEN;
2084
2085         m2->objects = xhash_new(3, INTEGER);
2086         ret = read_map(peer, m2, data);
2087 //      print_map(m2);
2088
2089         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2090         ssize_t r, sum = 0;
2091         while (sum < block_size) {
2092                 r = write(fd, data + sum, block_size -sum);
2093                 if (r < 0){
2094                         perror("write");
2095                         printf("write error\n");
2096                         exit(1);
2097                 } 
2098                 sum += r;
2099         }
2100         close(fd);
2101         map_node = find_object(m, 0);
2102         free(map_node);
2103         free(m);
2104 }
2105 */