modify logging mechanism to support redirection of std streams and logfile reopen
[archipelago] / xseg / peers / user / mt-mapperd.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <pthread.h>
5 #include <xseg/xseg.h>
6 #include <peer.h>
7 #include <time.h>
8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <gcrypt.h>
14 #include <errno.h>
15 #include <sched.h>
16 #include <sys/syscall.h>
17
18 GCRY_THREAD_OPTION_PTHREAD_IMPL;
19 #define MF_LOAD         (1 << 0)
20 #define MF_EXCLUSIVE    (1 << 1)
21 #define MF_FORCE        (1 << 2)
22
23 #define SHA256_DIGEST_SIZE 32
24 /* hex representation of sha256 value takes up double the sha256 size */
25 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
26
27 #define block_size (1<<22) //FIXME this should be defined here?
28 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
29 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value  + volume size */
30
31 #define MF_OBJECT_EXIST         (1 << 0)
32 #define MF_OBJECT_COPYING       (1 << 1)
33 #define MF_OBJECT_WRITING       (1 << 2)
34 #define MF_OBJECT_DELETING      (1 << 3)
35 #define MF_OBJECT_DELETED       (1 << 4)
36 #define MF_OBJECT_DESTROYED     (1 << 5)
37
38 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
39
40 char *magic_string = "This a magic string. Please hash me";
41 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
42 char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
43
44 //dispatch_internal mapper states
45 enum mapper_state {
46         ACCEPTED = 0,
47         WRITING = 1,
48         COPYING = 2,
49         DELETING = 3,
50         DROPPING_CACHE = 4
51 };
52
53 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
54
55 struct map_node {
56         uint32_t flags;
57         uint32_t objectidx;
58         uint32_t objectlen;
59         char object[XSEG_MAX_TARGETLEN + 1];    /* NULL terminated string */
60         struct map *map;
61         uint32_t ref;
62         uint32_t waiters;
63         st_cond_t cond;
64 };
65
66 #define MF_MAP_LOADING          (1 << 0)
67 #define MF_MAP_DESTROYED        (1 << 1)
68 #define MF_MAP_WRITING          (1 << 2)
69 #define MF_MAP_DELETING         (1 << 3)
70 #define MF_MAP_DROPPING_CACHE   (1 << 4)
71 #define MF_MAP_EXCLUSIVE        (1 << 5)
72 #define MF_MAP_OPENING          (1 << 6)
73 #define MF_MAP_CLOSING          (1 << 7)
74
75 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
76                                         MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
77
78 #define wait_on_pr(__pr, __condition__)         \
79         while (__condition__){                  \
80                 ta--;                           \
81                 __get_mapper_io(pr)->active = 0;\
82                 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
83                 st_cond_wait(__pr->cond);       \
84         }
85
86 #define wait_on_mapnode(__mn, __condition__)    \
87         while (__condition__){                  \
88                 ta--;                           \
89                 __mn->waiters++;                \
90                 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
91                 st_cond_wait(__mn->cond);       \
92         }
93
94 #define wait_on_map(__map, __condition__)       \
95         while (__condition__){                  \
96                 ta--;                           \
97                 __map->waiters++;               \
98                 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
99                 st_cond_wait(__map->cond);      \
100         }
101
102 #define signal_pr(__pr)                         \
103         do {                                    \
104                 if (!__get_mapper_io(pr)->active){\
105                         ta++;                   \
106                         XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta); \
107                         __get_mapper_io(pr)->active = 1;\
108                         st_cond_signal(__pr->cond);     \
109                 }                               \
110         }while(0)
111
112 #define signal_map(__map)                       \
113         do {                                    \
114                 if (__map->waiters) {           \
115                         ta += 1;                \
116                         XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
117                         __map->waiters--;       \
118                         st_cond_signal(__map->cond);    \
119                 }                               \
120         }while(0)
121
122 #define signal_mapnode(__mn)                    \
123         do {                                    \
124                 if (__mn->waiters) {            \
125                         ta += __mn->waiters;    \
126                         XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
127                         __mn->waiters = 0;      \
128                         st_cond_broadcast(__mn->cond);  \
129                 }                               \
130         }while(0)
131
132
133 struct map {
134         uint32_t flags;
135         uint64_t size;
136         uint32_t volumelen;
137         char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
138         xhash_t *objects;       /* obj_index --> map_node */
139         uint32_t ref;
140         uint32_t waiters;
141         st_cond_t cond;
142 };
143
144 struct mapperd {
145         xport bportno;          /* blocker that accesses data */
146         xport mbportno;         /* blocker that accesses maps */
147         xhash_t *hashmaps; // hash_function(target) --> struct map
148 };
149
150 struct mapper_io {
151         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
152         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
153         struct map_node *copyup_node;
154         volatile int err;                       /* error flag */
155         volatile uint64_t del_pending;
156         uint64_t delobj;
157         uint64_t dcobj;
158         cb_t cb;
159         enum mapper_state state;
160         volatile int active;
161 };
162
163 /* global vars */
164 struct mapperd *mapper;
165
166 void print_map(struct map *m);
167
168 /*
169  * Helper functions
170  */
171
172 static inline struct mapperd * __get_mapperd(struct peerd *peer)
173 {
174         return (struct mapperd *) peer->priv;
175 }
176
177 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
178 {
179         return (struct mapper_io *) pr->priv;
180 }
181
182 static inline uint64_t calc_map_obj(struct map *map)
183 {
184         if (map->size == -1)
185                 return 0;
186         uint64_t nr_objs = map->size / block_size;
187         if (map->size % block_size)
188                 nr_objs++;
189         return nr_objs;
190 }
191
192 static uint32_t calc_nr_obj(struct xseg_request *req)
193 {
194         unsigned int r = 1;
195         uint64_t rem_size = req->size;
196         uint64_t obj_offset = req->offset & (block_size -1); //modulo
197         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
198         rem_size -= obj_size;
199         while (rem_size > 0) {
200                 obj_size = (rem_size > block_size) ? block_size : rem_size;
201                 rem_size -= obj_size;
202                 r++;
203         }
204
205         return r;
206 }
207
208 /*
209  * Maps handling functions
210  */
211 //FIXME assert targetlen > 0
212
213
214 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
215 {
216         int r;
217         struct map *m = NULL;
218         char buf[XSEG_MAX_TARGETLEN+1];
219         //assert targetlen <= XSEG_MAX_TARGETLEN
220         strncpy(buf, target, targetlen);
221         buf[targetlen] = 0;
222         XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
223         r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
224         if (r < 0)
225                 return NULL;
226         return m;
227 }
228
229
230 static int insert_map(struct mapperd *mapper, struct map *map)
231 {
232         int r = -1;
233         
234         if (find_map(mapper, map->volume, map->volumelen)){
235                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
236                 goto out;
237         }
238
239         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
240                         map->volume, strlen(map->volume), (unsigned long) map);
241         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
242         while (r == -XHASH_ERESIZE) {
243                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
244                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
245                 if (!new_hashmap){
246                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
247                                         (unsigned long long) shift);
248                         goto out;
249                 }
250                 mapper->hashmaps = new_hashmap;
251                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
252         }
253 out:
254         return r;
255 }
256
257 static int remove_map(struct mapperd *mapper, struct map *map)
258 {
259         int r = -1;
260         
261         //assert no pending pr on map
262         
263         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
264         while (r == -XHASH_ERESIZE) {
265                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
266                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
267                 if (!new_hashmap){
268                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
269                                         (unsigned long long) shift);
270                         goto out;
271                 }
272                 mapper->hashmaps = new_hashmap;
273                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
274         }
275 out:
276         return r;
277 }
278
279 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
280 {
281         int r;
282         xport p;
283         struct peerd *peer = pr->peer;
284         struct xseg_request *req;
285         struct mapperd *mapper = __get_mapperd(peer);
286         void *dummy;
287
288         XSEGLOG2(&lc, I, "Closing map %s", map->volume);
289
290         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
291         if (!req){
292                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
293                                 map->volume);
294                 goto out_err;
295         }
296
297         r = xseg_prep_request(peer->xseg, req, map->volumelen, block_size);
298         if (r < 0){
299                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
300                                 map->volume);
301                 goto out_put;
302         }
303
304         char *reqtarget = xseg_get_target(peer->xseg, req);
305         if (!reqtarget)
306                 goto out_put;
307         strncpy(reqtarget, map->volume, req->targetlen);
308         req->op = X_CLOSE;
309         req->size = block_size;
310         req->offset = 0;
311         r = xseg_set_req_data(peer->xseg, req, pr);
312         if (r < 0){
313                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
314                                 map->volume);
315                 goto out_put;
316         }
317         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
318         if (p == NoPort){ 
319                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
320                                 map->volume);
321                 goto out_unset;
322         }
323         r = xseg_signal(peer->xseg, p);
324         
325         XSEGLOG2(&lc, I, "Map %s closing", map->volume);
326         return req;
327
328 out_unset:
329         xseg_get_req_data(peer->xseg, req, &dummy);
330 out_put:
331         xseg_put_request(peer->xseg, req, pr->portno);
332 out_err:
333         return NULL;
334 }
335
336 static int close_map(struct peer_req *pr, struct map *map)
337 {
338         int err;
339         struct xseg_request *req;
340         struct peerd *peer = pr->peer;
341
342         map->flags |= MF_MAP_CLOSING;
343         req = __close_map(pr, map);
344         if (!req)
345                 return -1;
346         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
347         map->flags &= ~MF_MAP_CLOSING;
348         err = req->state & XS_FAILED;
349         xseg_put_request(peer->xseg, req, pr->portno);
350         if (err)
351                 return -1;
352         return 0;
353 }
354
355 /*
356 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
357                                 char *target, uint32_t targetlen, struct map **m)
358 {
359         struct mapperd *mapper = __get_mapperd(peer);
360         int r;
361         *m = find_map(mapper, target, targetlen);
362         if (*m) {
363                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
364                 if ((*m)->flags & MF_MAP_NOT_READY) {
365                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
366                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
367                         return MF_PENDING;
368                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
369                 //      return -1;
370                 // 
371                 }else {
372                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
373                         return 0;
374                 }
375         }
376         r = open_map(peer, pr, target, targetlen, 0);
377         if (r < 0)
378                 return -1; //error
379         return MF_PENDING;      
380 }
381 */
382 /*
383  * Object handling functions
384  */
385
386 struct map_node *find_object(struct map *map, uint64_t obj_index)
387 {
388         struct map_node *mn;
389         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
390         if (r < 0)
391                 return NULL;
392         return mn;
393 }
394
395 static int insert_object(struct map *map, struct map_node *mn)
396 {
397         //FIXME no find object first
398         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
399         if (r == -XHASH_ERESIZE) {
400                 unsigned long shift = xhash_grow_size_shift(map->objects);
401                 map->objects = xhash_resize(map->objects, shift, NULL);
402                 if (!map->objects)
403                         return -1;
404                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
405         }
406         return r;
407 }
408
409
410 /*
411  * map read/write functions 
412  */
413 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
414 {
415         int i;
416         //hexlify sha256 value
417         for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
418                 sprintf(mn->object+2*i, "%02x", buf[i]);
419         }
420
421         mn->object[SHA256_DIGEST_SIZE * 2] = 0;
422         mn->objectlen = SHA256_DIGEST_SIZE * 2;
423         mn->flags = MF_OBJECT_EXIST;
424 }
425
426 static inline void map_to_object(struct map_node *mn, char *buf)
427 {
428         char c = buf[0];
429         mn->flags = 0;
430         if (c)
431                 mn->flags |= MF_OBJECT_EXIST;
432         memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
433         mn->object[XSEG_MAX_TARGETLEN] = 0;
434         mn->objectlen = strlen(mn->object);
435 }
436
437 static inline void object_to_map(char* buf, struct map_node *mn)
438 {
439         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
440         memcpy(buf+1, mn->object, mn->objectlen);
441         memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
442 }
443
444 static inline void mapheader_to_map(struct map *m, char *buf)
445 {
446         uint64_t pos = 0;
447         memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
448         pos += SHA256_DIGEST_SIZE;
449         memcpy(buf + pos, &m->size, sizeof(m->size));
450         pos += sizeof(m->size);
451 }
452
453
454 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr, 
455                                 struct map *map, struct map_node *mn)
456 {
457         void *dummy;
458         struct mapperd *mapper = __get_mapperd(peer);
459         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
460                                                         mapper->mbportno, X_ALLOC);
461         if (!req){
462                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
463                                 "(Map: %s [%llu]",
464                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
465                 goto out_err;
466         }
467         int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
468         if (r < 0){
469                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
470                                 "(Map: %s [%llu]",
471                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
472                 goto out_put;
473         }
474         char *target = xseg_get_target(peer->xseg, req);
475         strncpy(target, map->volume, req->targetlen);
476         req->size = objectsize_in_map;
477         req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
478         req->op = X_WRITE;
479         char *data = xseg_get_data(peer->xseg, req);
480         object_to_map(data, mn);
481
482         r = xseg_set_req_data(peer->xseg, req, pr);
483         if (r < 0){
484                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
485                                 "(Map: %s [%llu]",
486                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
487                 goto out_put;
488         }
489         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
490         if (p == NoPort){
491                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
492                                 "(Map: %s [%llu]",
493                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
494                 goto out_unset;
495         }
496         r = xseg_signal(peer->xseg, p);
497         if (r < 0)
498                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
499
500         XSEGLOG2(&lc, I, "Writing object %s \n\t"
501                         "Map: %s [%llu]",
502                         mn->object, map->volume, (unsigned long long) mn->objectidx);
503
504         return req;
505
506 out_unset:
507         xseg_get_req_data(peer->xseg, req, &dummy);
508 out_put:
509         xseg_put_request(peer->xseg, req, pr->portno);
510 out_err:
511         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
512                         "(Map: %s [%llu]",
513                         mn->object, map->volume, (unsigned long long) mn->objectidx);
514         return NULL;
515 }
516
517 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
518 {
519         void *dummy;
520         struct peerd *peer = pr->peer;
521         struct mapperd *mapper = __get_mapperd(peer);
522         struct map_node *mn;
523         uint64_t i, pos, max_objidx = calc_map_obj(map);
524         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
525                                                         mapper->mbportno, X_ALLOC);
526         if (!req){
527                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
528                 goto out_err;
529         }
530         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 
531                                         mapheader_size + max_objidx * objectsize_in_map);
532         if (r < 0){
533                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
534                 goto out_put;
535         }
536         char *target = xseg_get_target(peer->xseg, req);
537         strncpy(target, map->volume, req->targetlen);
538         char *data = xseg_get_data(peer->xseg, req);
539         mapheader_to_map(map, data);
540         pos = mapheader_size;
541         req->op = X_WRITE;
542         req->size = req->datalen;
543         req->offset = 0;
544
545         if (map->size % block_size)
546                 max_objidx++;
547         for (i = 0; i < max_objidx; i++) {
548                 mn = find_object(map, i);
549                 if (!mn){
550                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
551                                         (unsigned long long) i, map->volume);
552                         goto out_put;
553                 }
554                 object_to_map(data+pos, mn);
555                 pos += objectsize_in_map;
556         }
557         r = xseg_set_req_data(peer->xseg, req, pr);
558         if (r < 0){
559                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
560                                 map->volume);
561                 goto out_put;
562         }
563         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
564         if (p == NoPort){
565                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
566                                 map->volume);
567                 goto out_unset;
568         }
569         r = xseg_signal(peer->xseg, p);
570         if (r < 0)
571                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
572
573         map->flags |= MF_MAP_WRITING;
574         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
575         return req;
576
577 out_unset:
578         xseg_get_req_data(peer->xseg, req, &dummy);
579 out_put:
580         xseg_put_request(peer->xseg, req, pr->portno);
581 out_err:
582         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
583         return NULL;
584 }
585
586 static int write_map(struct peer_req* pr, struct map *map)
587 {
588         int r = 0;
589         struct peerd *peer = pr->peer;
590         map->flags |= MF_MAP_WRITING;
591         struct xseg_request *req = __write_map(pr, map);
592         if (!req)
593                 return -1;
594         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
595         if (req->state & XS_FAILED)
596                 r = -1;
597         xseg_put_request(peer->xseg, req, pr->portno);
598         map->flags &= ~MF_MAP_WRITING;
599         return r;
600 }
601
602 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
603 {
604         int r;
605         xport p;
606         struct xseg_request *req;
607         struct peerd *peer = pr->peer;
608         struct mapperd *mapper = __get_mapperd(peer);
609         void *dummy;
610
611         XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
612
613         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
614         if (!req){
615                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
616                                 m->volume);
617                 goto out_fail;
618         }
619
620         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
621         if (r < 0){
622                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
623                                 m->volume);
624                 goto out_put;
625         }
626
627         char *reqtarget = xseg_get_target(peer->xseg, req);
628         if (!reqtarget)
629                 goto out_put;
630         strncpy(reqtarget, m->volume, req->targetlen);
631         req->op = X_READ;
632         req->size = block_size;
633         req->offset = 0;
634         r = xseg_set_req_data(peer->xseg, req, pr);
635         if (r < 0){
636                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
637                                 m->volume);
638                 goto out_put;
639         }
640         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
641         if (p == NoPort){ 
642                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
643                                 m->volume);
644                 goto out_unset;
645         }
646         r = xseg_signal(peer->xseg, p);
647         
648         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
649         return req;
650
651 out_unset:
652         xseg_get_req_data(peer->xseg, req, &dummy);
653 out_put:
654         xseg_put_request(peer->xseg, req, pr->portno);
655 out_fail:
656         return NULL;
657 }
658
659 static int read_map (struct map *map, char *buf)
660 {
661         char nulls[SHA256_DIGEST_SIZE];
662         memset(nulls, 0, SHA256_DIGEST_SIZE);
663
664         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
665         if (r) {
666                 XSEGLOG2(&lc, D, "Read zeros");
667                 //read error;
668                 return -1;
669         }
670         //type 1, our type, type 0 pithos map
671         int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
672         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
673         uint64_t pos;
674         uint64_t i, nr_objs;
675         struct map_node *map_node;
676         if (type) {
677                 pos = SHA256_DIGEST_SIZE;
678                 map->size = *(uint64_t *) (buf + pos);
679                 pos += sizeof(uint64_t);
680                 nr_objs = map->size / block_size;
681                 if (map->size % block_size)
682                         nr_objs++;
683                 map_node = calloc(nr_objs, sizeof(struct map_node));
684                 if (!map_node)
685                         return -1;
686
687                 for (i = 0; i < nr_objs; i++) {
688                         map_node[i].map = map;
689                         map_node[i].objectidx = i;
690                         map_node[i].waiters = 0;
691                         map_node[i].ref = 1;
692                         map_node[i].cond = st_cond_new(); //FIXME err check;
693                         map_to_object(&map_node[i], buf + pos);
694                         pos += objectsize_in_map;
695                         r = insert_object(map, &map_node[i]); //FIXME error check
696                 }
697         } else {
698                 pos = 0;
699                 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
700                 map_node = calloc(max_nr_objs, sizeof(struct map_node));
701                 if (!map_node)
702                         return -1;
703                 for (i = 0; i < max_nr_objs; i++) {
704                         if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
705                                 break;
706                         map_node[i].objectidx = i;
707                         map_node[i].map = map;
708                         map_node[i].waiters = 0;
709                         map_node[i].ref = 1;
710                         map_node[i].cond = st_cond_new(); //FIXME err check;
711                         pithosmap_to_object(&map_node[i], buf + pos);
712                         pos += SHA256_DIGEST_SIZE; 
713                         r = insert_object(map, &map_node[i]); //FIXME error check
714                 }
715                 map->size = i * block_size; 
716         }
717         print_map(map);
718         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
719         return 0;
720
721         //FIXME cleanup on error
722 }
723
724 static int load_map(struct peer_req *pr, struct map *map)
725 {
726         int r = 0;
727         struct xseg_request *req;
728         struct peerd *peer = pr->peer;
729         map->flags |= MF_MAP_LOADING;
730         req = __load_map(pr, map);
731         if (!req)
732                 return -1;
733         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
734         map->flags &= ~MF_MAP_LOADING;
735         if (req->state & XS_FAILED){
736                 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
737                 xseg_put_request(peer->xseg, req, pr->portno);
738                 return -1;
739         }
740         r = read_map(map, xseg_get_data(peer->xseg, req));
741         xseg_put_request(peer->xseg, req, pr->portno);
742         return r;
743 }
744
745 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
746                                                 uint32_t flags)
747 {
748         int r;
749         xport p;
750         struct xseg_request *req;
751         struct peerd *peer = pr->peer;
752         struct mapperd *mapper = __get_mapperd(peer);
753         void *dummy;
754
755         XSEGLOG2(&lc, I, "Opening map %s", m->volume);
756
757         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
758         if (!req){
759                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
760                                 m->volume);
761                 goto out_fail;
762         }
763
764         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
765         if (r < 0){
766                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
767                                 m->volume);
768                 goto out_put;
769         }
770
771         char *reqtarget = xseg_get_target(peer->xseg, req);
772         if (!reqtarget)
773                 goto out_put;
774         strncpy(reqtarget, m->volume, req->targetlen);
775         req->op = X_OPEN;
776         req->size = block_size;
777         req->offset = 0;
778         if (!(flags & MF_FORCE))
779                 req->flags = XF_NOSYNC;
780         r = xseg_set_req_data(peer->xseg, req, pr);
781         if (r < 0){
782                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
783                                 m->volume);
784                 goto out_put;
785         }
786         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
787         if (p == NoPort){ 
788                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
789                                 m->volume);
790                 goto out_unset;
791         }
792         r = xseg_signal(peer->xseg, p);
793         
794         XSEGLOG2(&lc, I, "Map %s opening", m->volume);
795         return req;
796
797 out_unset:
798         xseg_get_req_data(peer->xseg, req, &dummy);
799 out_put:
800         xseg_put_request(peer->xseg, req, pr->portno);
801 out_fail:
802         return NULL;
803 }
804
805 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
806 {
807         int err;
808         struct xseg_request *req;
809         struct peerd *peer = pr->peer;
810
811         map->flags |= MF_MAP_OPENING;
812         req = __open_map(pr, map, flags);
813         if (!req)
814                 return -1;
815         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
816         map->flags &= ~MF_MAP_OPENING;
817         err = req->state & XS_FAILED;
818         xseg_put_request(peer->xseg, req, pr->portno);
819         if (err)
820                 return -1;
821         else 
822                 map->flags |= MF_MAP_EXCLUSIVE;
823         return 0;
824 }
825
826 /*
827  * copy up functions
828  */
829
830 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
831 {
832         int r = 0;
833         if (mn){
834                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
835                 if (r == -XHASH_ERESIZE) {
836                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
837                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
838                         if (!new_hashmap)
839                                 goto out;
840                         mio->copyups_nodes = new_hashmap;
841                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
842                 }
843         }
844         else {
845                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
846                 if (r == -XHASH_ERESIZE) {
847                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
848                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
849                         if (!new_hashmap)
850                                 goto out;
851                         mio->copyups_nodes = new_hashmap;
852                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
853                 }
854         }
855 out:
856         return r;
857 }
858
859 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
860 {
861         struct map_node *mn;
862         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
863         if (r < 0)
864                 return NULL;
865         return mn;
866 }
867
868 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
869 {
870         struct mapperd *mapper = __get_mapperd(peer);
871         struct mapper_io *mio = __get_mapper_io(pr);
872         struct map *map = mn->map;
873         void *dummy;
874         int r = -1, i;
875         xport p;
876
877         //struct sha256_ctx sha256ctx;
878         uint32_t newtargetlen;
879         char new_target[XSEG_MAX_TARGETLEN + 1]; 
880         unsigned char buf[SHA256_DIGEST_SIZE];  //assert sha256_digest_size(32) <= MAXTARGETLEN 
881         char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
882         strncpy(new_object, map->volume, map->volumelen);
883         sprintf(new_object + map->volumelen, "%u", mn->objectidx); //sprintf adds null termination
884         new_object[XSEG_MAX_TARGETLEN + 19] = 0;
885
886         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
887         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
888                 sprintf (new_target + 2*i, "%02x", buf[i]);
889         newtargetlen = SHA256_DIGEST_SIZE  * 2;
890
891         if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE)) 
892                 goto copyup_zeroblock;
893
894         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
895                                                         mapper->bportno, X_ALLOC);
896         if (!req){
897                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
898                 goto out_err;
899         }
900         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
901                                 sizeof(struct xseg_request_copy));
902         if (r < 0){
903                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
904                 goto out_put;
905         }
906
907         char *target = xseg_get_target(peer->xseg, req);
908         strncpy(target, new_target, req->targetlen);
909
910         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
911         strncpy(xcopy->target, mn->object, mn->objectlen);
912         xcopy->targetlen = mn->objectlen;
913
914         req->offset = 0;
915         req->size = block_size;
916         req->op = X_COPY;
917         r = xseg_set_req_data(peer->xseg, req, pr);
918         if (r<0){
919                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
920                 goto out_put;
921         }
922         r = __set_copyup_node(mio, req, mn);
923         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
924         if (p == NoPort) {
925                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
926                 goto out_unset;
927         }
928         xseg_signal(peer->xseg, p);
929 //      mio->copyups++;
930
931         mn->flags |= MF_OBJECT_COPYING;
932         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
933         return req;
934
935 out_unset:
936         r = __set_copyup_node(mio, req, NULL);
937         xseg_get_req_data(peer->xseg, req, &dummy);
938 out_put:
939         xseg_put_request(peer->xseg, req, pr->portno);
940 out_err:
941         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
942         return NULL;
943
944 copyup_zeroblock:
945         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
946                         "Proceeding in writing the new object in map");
947         /* construct a tmp map_node for writing purposes */
948         struct map_node newmn = *mn;
949         newmn.flags = MF_OBJECT_EXIST;
950         strncpy(newmn.object, new_target, newtargetlen);
951         newmn.object[newtargetlen] = 0;
952         newmn.objectlen = newtargetlen;
953         newmn.objectidx = mn->objectidx; 
954         req = object_write(peer, pr, map, &newmn);
955         r = __set_copyup_node(mio, req, mn);
956         if (!req){
957                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
958                                 "\n\t of map %s [%llu]",
959                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
960                 return NULL;
961         }
962         mn->flags |= MF_OBJECT_WRITING;
963         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
964         return req;
965 }
966
967 static struct xseg_request * delete_object(struct peer_req *pr, struct map_node *mn)
968 {
969         void *dummy;
970         struct peerd *peer = pr->peer;
971         struct mapperd *mapper = __get_mapperd(peer);
972         struct mapper_io *mio = __get_mapper_io(pr);
973         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
974                                                         mapper->bportno, X_ALLOC);
975         XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
976         if (!req){
977                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
978                 goto out_err;
979         }
980         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
981         if (r < 0){
982                 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
983                 goto out_put;
984         }
985         char *target = xseg_get_target(peer->xseg, req);
986         strncpy(target, mn->object, req->targetlen);
987         req->op = X_DELETE;
988         req->size = req->datalen;
989         req->offset = 0;
990         r = xseg_set_req_data(peer->xseg, req, pr);
991         if (r < 0){
992                 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
993                 goto out_put;
994         }
995         __set_copyup_node(mio, req, mn);
996         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
997         if (p == NoPort){
998                 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
999                 goto out_unset;
1000         }
1001         r = xseg_signal(peer->xseg, p);
1002         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1003         return req;
1004
1005 out_unset:
1006         xseg_get_req_data(peer->xseg, req, &dummy);
1007 out_put:
1008         xseg_put_request(peer->xseg, req, pr->portno);
1009 out_err:
1010         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1011         return NULL;
1012 }
1013
1014 static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
1015 {
1016         void *dummy;
1017         struct peerd *peer = pr->peer;
1018         struct mapperd *mapper = __get_mapperd(peer);
1019         struct mapper_io *mio = __get_mapper_io(pr);
1020         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1021                                                         mapper->mbportno, X_ALLOC);
1022         XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1023         if (!req){
1024                 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1025                 goto out_err;
1026         }
1027         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1028         if (r < 0){
1029                 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1030                 goto out_put;
1031         }
1032         char *target = xseg_get_target(peer->xseg, req);
1033         strncpy(target, map->volume, req->targetlen);
1034         req->op = X_DELETE;
1035         req->size = req->datalen;
1036         req->offset = 0;
1037         r = xseg_set_req_data(peer->xseg, req, pr);
1038         if (r < 0){
1039                 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1040                 goto out_put;
1041         }
1042         __set_copyup_node(mio, req, NULL);
1043         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1044         if (p == NoPort){
1045                 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1046                 goto out_unset;
1047         }
1048         r = xseg_signal(peer->xseg, p);
1049         map->flags |= MF_MAP_DELETING;
1050         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1051         return req;
1052
1053 out_unset:
1054         xseg_get_req_data(peer->xseg, req, &dummy);
1055 out_put:
1056         xseg_put_request(peer->xseg, req, pr->portno);
1057 out_err:
1058         XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1059         return  NULL;
1060 }
1061
1062
1063 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1064 {
1065         struct map_node *mn = find_object(map, index);
1066         if (mn)
1067                 mn->ref++;
1068         return mn;
1069 }
1070
1071 static inline void put_mapnode(struct map_node *mn)
1072 {
1073         mn->ref--;
1074         if (!mn->ref){
1075                 //clean up mn
1076                 st_cond_destroy(mn->cond);
1077         }
1078 }
1079
1080 static inline void __get_map(struct map *map)
1081 {
1082         map->ref++;
1083 }
1084
1085 static inline void put_map(struct map *map)
1086 {
1087         struct map_node *mn;
1088         map->ref--;
1089         if (!map->ref){
1090                 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1091                 //clean up map
1092                 uint64_t i;
1093                 for (i = 0; i < calc_map_obj(map); i++) {
1094                         mn = get_mapnode(map, i);
1095                         if (mn) {
1096                                 //make sure all pending operations on all objects are completed
1097                                 if (mn->flags & MF_OBJECT_NOT_READY){
1098                                         //this should never happen...
1099                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1100                                 }
1101                                 mn->flags &= MF_OBJECT_DESTROYED;
1102                                 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1103                                 put_mapnode(mn); //matcing get_mapnode;
1104                                 //assert mn->ref == 0;
1105                         }
1106                 }
1107                 mn = find_object(map, 0);
1108                 if (mn)
1109                         free(mn);
1110                 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1111                 free(map);
1112         }
1113 }
1114
1115 static struct map * create_map(struct mapperd *mapper, char *name, uint32_t namelen)
1116 {
1117         int r;
1118         struct map *m = malloc(sizeof(struct map));
1119         if (!m){
1120                 XSEGLOG2(&lc, E, "Cannot allocate map ");
1121                 goto out_err;
1122         }
1123         m->size = -1;
1124         strncpy(m->volume, name, namelen);
1125         m->volume[namelen] = 0;
1126         m->volumelen = namelen;
1127         m->flags = 0;
1128         m->objects = xhash_new(3, INTEGER); 
1129         if (!m->objects){
1130                 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1131                                 m->volume);
1132                 goto out_map;
1133         }
1134         m->ref = 1;
1135         m->waiters = 0;
1136         m->cond = st_cond_new(); //FIXME err check;
1137         r = insert_map(mapper, m);
1138         if (r < 0){
1139                 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1140                 goto out_hash;
1141         }
1142
1143         return m;
1144
1145 out_hash:
1146         xhash_free(m->objects);
1147 out_map:
1148         XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1149         free(m);
1150 out_err:
1151         return NULL;
1152 }
1153
1154
1155
1156 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1157 {
1158         struct peerd *peer = pr->peer;
1159         struct mapperd *mapper = __get_mapperd(peer);
1160         (void)mapper;
1161         struct mapper_io *mio = __get_mapper_io(pr);
1162         struct map_node *mn = __get_copyup_node(mio, req);
1163
1164         mio->del_pending--;
1165         if (req->state & XS_FAILED){
1166                 mio->err = 1;
1167         }
1168         signal_mapnode(mn);
1169         xseg_put_request(peer->xseg, req, pr->portno);
1170         signal_pr(pr);
1171 }
1172
1173 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1174 {
1175         struct peerd *peer = pr->peer;
1176         struct mapperd *mapper = __get_mapperd(peer);
1177         (void)mapper;
1178         struct mapper_io *mio = __get_mapper_io(pr);
1179         struct map_node *mn = __get_copyup_node(mio, req);
1180         if (!mn){
1181                 XSEGLOG2(&lc, E, "Cannot get map node");
1182                 goto out_err;
1183         }
1184         __set_copyup_node(mio, req, NULL);
1185
1186         if (req->state & XS_FAILED){
1187                 XSEGLOG2(&lc, E, "Req failed");
1188                 mn->flags &= ~MF_OBJECT_COPYING;
1189                 mn->flags &= ~MF_OBJECT_WRITING;
1190                 goto out_err;
1191         }
1192         if (req->op == X_WRITE) {
1193                 char *target = xseg_get_target(peer->xseg, req);
1194                 (void)target;
1195                 //printf("handle object write replyi\n");
1196                 __set_copyup_node(mio, req, NULL);
1197                 //assert mn->flags & MF_OBJECT_WRITING
1198                 mn->flags &= ~MF_OBJECT_WRITING;
1199                 
1200                 struct map_node tmp;
1201                 char *data = xseg_get_data(peer->xseg, req);
1202                 map_to_object(&tmp, data);
1203                 mn->flags |= MF_OBJECT_EXIST;
1204                 if (mn->flags != MF_OBJECT_EXIST){
1205                         XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1206                         goto out_err;
1207                 }
1208                 //assert mn->flags & MF_OBJECT_EXIST
1209                 strncpy(mn->object, tmp.object, tmp.objectlen);
1210                 mn->object[tmp.objectlen] = 0;
1211                 mn->objectlen = tmp.objectlen;
1212                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1213                 mio->copyups--;
1214                 signal_mapnode(mn);
1215         } else if (req->op == X_COPY) {
1216         //      issue write_object;
1217                 mn->flags &= ~MF_OBJECT_COPYING;
1218                 struct map *map = mn->map;
1219                 if (!map){
1220                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1221                         goto out_err;
1222                 }
1223
1224                 /* construct a tmp map_node for writing purposes */
1225                 char *target = xseg_get_target(peer->xseg, req);
1226                 struct map_node newmn = *mn;
1227                 newmn.flags = MF_OBJECT_EXIST;
1228                 strncpy(newmn.object, target, req->targetlen);
1229                 newmn.object[req->targetlen] = 0;
1230                 newmn.objectlen = req->targetlen;
1231                 newmn.objectidx = mn->objectidx; 
1232                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1233                 if (!xreq){
1234                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1235                                         "\n\t of map %s [%llu]",
1236                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1237                         goto out_err;
1238                 }
1239                 mn->flags |= MF_OBJECT_WRITING;
1240                 __set_copyup_node (mio, xreq, mn);
1241
1242                 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1243         } else {
1244                 //wtf??
1245                 ;
1246         }
1247
1248 out:
1249         xseg_put_request(peer->xseg, req, pr->portno);
1250         return;
1251
1252 out_err:
1253         mio->copyups--;
1254         XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1255         mio->err = 1;
1256         if (mn)
1257                 signal_mapnode(mn);
1258         goto out;
1259
1260 }
1261
1262 struct r2o {
1263         struct map_node *mn;
1264         uint64_t offset;
1265         uint64_t size;
1266 };
1267
1268 static int req2objs(struct peer_req *pr, struct map *map, int write)
1269 {
1270         int r = 0;
1271         struct peerd *peer = pr->peer;
1272         struct mapper_io *mio = __get_mapper_io(pr);
1273         char *target = xseg_get_target(peer->xseg, pr->req);
1274         uint32_t nr_objs = calc_nr_obj(pr->req);
1275         uint64_t size = sizeof(struct xseg_reply_map) + 
1276                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1277         uint32_t idx, i, ready;
1278         uint64_t rem_size, obj_index, obj_offset, obj_size; 
1279         struct map_node *mn;
1280         mio->copyups = 0;
1281         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1282
1283         /* get map_nodes of request */
1284         struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1285         if (!mns){
1286                 XSEGLOG2(&lc, E, "Cannot allocate mns");
1287                 return -1;
1288         }
1289         idx = 0;
1290         rem_size = pr->req->size;
1291         obj_index = pr->req->offset / block_size;
1292         obj_offset = pr->req->offset & (block_size -1); //modulo
1293         obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1294         mn = get_mapnode(map, obj_index);
1295         if (!mn) {
1296                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1297                 r = -1;
1298                 goto out;
1299         }
1300         mns[idx].mn = mn;
1301         mns[idx].offset = obj_offset;
1302         mns[idx].size = obj_size;
1303         rem_size -= obj_size;
1304         while (rem_size > 0) {
1305                 idx++;
1306                 obj_index++;
1307                 obj_offset = 0;
1308                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
1309                 rem_size -= obj_size;
1310                 mn = get_mapnode(map, obj_index);
1311                 if (!mn) {
1312                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1313                         r = -1;
1314                         goto out;
1315                 }
1316                 mns[idx].mn = mn;
1317                 mns[idx].offset = obj_offset;
1318                 mns[idx].size = obj_size;
1319         }
1320         if (write) {
1321                 ready = 0;
1322                 int can_wait = 0;
1323                 mio->cb=copyup_cb;
1324                 while (ready < (idx + 1)){
1325                         ready = 0;
1326                         for (i = 0; i < (idx+1); i++) {
1327                                 mn = mns[i].mn;
1328                                 //do copyups
1329                                 if (mn->flags & MF_OBJECT_NOT_READY) {
1330                                         if (can_wait){
1331                                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1332                                                 if (mn->flags & MF_OBJECT_DELETED){
1333                                                         mio->err = 1;
1334                                                 }
1335                                                 if (mio->err){
1336                                                         XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1337                                                         if (!mio->copyups){
1338                                                                 r = -1;
1339                                                                 goto out;
1340                                                         }
1341                                                 }
1342                                         }
1343                                 }
1344                                 else if (!(mn->flags & MF_OBJECT_EXIST)) {
1345                                         //calc new_target, copy up object
1346                                         if (copyup_object(peer, mn, pr) == NULL){
1347                                                 XSEGLOG2(&lc, E, "Error in copy up object");
1348                                                 mio->err = 1;
1349                                         } else {
1350                                                 mio->copyups++;
1351                                         }
1352                                 } else {
1353                                         ready++;
1354                                 }
1355                         }
1356                         can_wait = 1;
1357                 }
1358                 /*
1359 pending_copyups:
1360                 while(mio->copyups > 0){
1361                         mio->cb = copyup_cb;
1362                         wait_on_pr(pr, 0);
1363                         ta--;
1364                         st_cond_wait(pr->cond);
1365                 }
1366                 */
1367         }
1368
1369         if (mio->err){
1370                 r = -1;
1371                 XSEGLOG2(&lc, E, "Mio->err");
1372                 goto out;
1373         }
1374
1375         /* resize request to fit reply */
1376         char buf[XSEG_MAX_TARGETLEN];
1377         strncpy(buf, target, pr->req->targetlen);
1378         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1379         if (r < 0) {
1380                 XSEGLOG2(&lc, E, "Cannot resize request");
1381                 goto out;
1382         }
1383         target = xseg_get_target(peer->xseg, pr->req);
1384         strncpy(target, buf, pr->req->targetlen);
1385
1386         /* structure reply */
1387         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1388         reply->cnt = nr_objs;
1389         for (i = 0; i < (idx+1); i++) {
1390                 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1391                 reply->segs[i].targetlen = mns[i].mn->objectlen;
1392                 reply->segs[i].offset = mns[i].offset;
1393                 reply->segs[i].size = mns[i].size;
1394         }
1395 out:
1396         for (i = 0; i < idx; i++) {
1397                 put_mapnode(mns[i].mn);
1398         }
1399         free(mns);
1400         return r;
1401 }
1402
1403 static int do_dropcache(struct peer_req *pr, struct map *map)
1404 {
1405         struct map_node *mn;
1406         struct peerd *peer = pr->peer;
1407         struct mapperd *mapper = __get_mapperd(peer);
1408         uint64_t i;
1409         XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1410         map->flags |= MF_MAP_DROPPING_CACHE;
1411         for (i = 0; i < calc_map_obj(map); i++) {
1412                 mn = get_mapnode(map, i);
1413                 if (mn) {
1414                         if (!(mn->flags & MF_OBJECT_DESTROYED)){
1415                                 //make sure all pending operations on all objects are completed
1416                                 if (mn->flags & MF_OBJECT_NOT_READY){
1417                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1418                                 }
1419                                 mn->flags &= MF_OBJECT_DESTROYED;
1420                         }
1421                         put_mapnode(mn);
1422                 }
1423         }
1424         map->flags &= ~MF_MAP_DROPPING_CACHE;
1425         map->flags |= MF_MAP_DESTROYED;
1426         remove_map(mapper, map);
1427         XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1428         put_map(map);   // put map here to destroy it (matches m->ref = 1 on map create)
1429         return 0;
1430 }
1431
1432 static int do_info(struct peer_req *pr, struct map *map)
1433 {
1434         struct peerd *peer = pr->peer;
1435         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1436         xinfo->size = map->size;
1437         return 0;
1438 }
1439
1440
1441 static int do_close(struct peer_req *pr, struct map *map)
1442 {
1443 //      struct peerd *peer = pr->peer;
1444 //      struct xseg_request *req;
1445         if (map->flags & MF_MAP_EXCLUSIVE) 
1446                 close_map(pr, map);
1447         return do_dropcache(pr, map);
1448 }
1449
1450 static int do_destroy(struct peer_req *pr, struct map *map)
1451 {
1452         uint64_t i;
1453         struct peerd *peer = pr->peer;
1454         struct mapper_io *mio = __get_mapper_io(pr);
1455         struct map_node *mn;
1456         struct xseg_request *req;
1457         
1458         XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1459         map->flags |= MF_MAP_DELETING;
1460         req = delete_map(pr, map);
1461         if (!req)
1462                 return -1;
1463         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1464         if (req->state & XS_FAILED){
1465                 xseg_put_request(peer->xseg, req, pr->portno);
1466                 map->flags &= ~MF_MAP_DELETING;
1467                 return -1;
1468         }
1469         xseg_put_request(peer->xseg, req, pr->portno);
1470         //FIXME
1471         uint64_t nr_obj = calc_map_obj(map);
1472         uint64_t deleted = 0;
1473         while (deleted < nr_obj){ 
1474                 deleted = 0;
1475                 for (i = 0; i < nr_obj; i++){
1476                         mn = get_mapnode(map, i);
1477                         if (mn) {
1478                                 if (!(mn->flags & MF_OBJECT_DESTROYED)){
1479                                         if (mn->flags & MF_OBJECT_EXIST){
1480                                                 //make sure all pending operations on all objects are completed
1481                                                 if (mn->flags & MF_OBJECT_NOT_READY){
1482                                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1483                                                 }
1484                                                 req = delete_object(pr, mn);
1485                                                 if (!req)
1486                                                         if (mio->del_pending){
1487                                                                 goto wait_pending;
1488                                                         } else {
1489                                                                 continue;
1490                                                         }
1491                                                 else {
1492                                                         mio->del_pending++;
1493                                                 }
1494                                         }
1495                                         mn->flags &= MF_OBJECT_DESTROYED;
1496                                 }
1497                                 put_mapnode(mn);
1498                         }
1499                         deleted++;
1500                 }
1501 wait_pending:
1502                 mio->cb = deletion_cb;
1503                 wait_on_pr(pr, mio->del_pending > 0);
1504         }
1505         mio->cb = NULL;
1506         map->flags &= ~MF_MAP_DELETING;
1507         XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1508         return do_close(pr, map);
1509 }
1510
1511 static int do_mapr(struct peer_req *pr, struct map *map)
1512 {
1513         struct peerd *peer = pr->peer;
1514         int r = req2objs(pr, map, 0);
1515         if  (r < 0){
1516                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1517                                 map->volume, 
1518                                 (unsigned long long) pr->req->offset, 
1519                                 (unsigned long long) (pr->req->offset + pr->req->size));
1520                 return -1;
1521         }
1522         XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1523                         map->volume, 
1524                         (unsigned long long) pr->req->offset, 
1525                         (unsigned long long) (pr->req->offset + pr->req->size));
1526         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1527                         (unsigned long long) pr->req->offset,
1528                         (unsigned long long) pr->req->size);
1529         char buf[XSEG_MAX_TARGETLEN+1];
1530         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1531         int i;
1532         for (i = 0; i < reply->cnt; i++) {
1533                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1534                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1535                 buf[reply->segs[i].targetlen] = 0;
1536                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1537                                 (unsigned long long) reply->segs[i].offset,
1538                                 (unsigned long long) reply->segs[i].size);
1539         }
1540         return 0;
1541 }
1542
1543 static int do_mapw(struct peer_req *pr, struct map *map)
1544 {
1545         struct peerd *peer = pr->peer;
1546         int r = req2objs(pr, map, 1);
1547         if  (r < 0){
1548                 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1549                                 map->volume, 
1550                                 (unsigned long long) pr->req->offset, 
1551                                 (unsigned long long) (pr->req->offset + pr->req->size));
1552                 return -1;
1553         }
1554         XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1555                         map->volume, 
1556                         (unsigned long long) pr->req->offset, 
1557                         (unsigned long long) (pr->req->offset + pr->req->size));
1558         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1559                         (unsigned long long) pr->req->offset,
1560                         (unsigned long long) pr->req->size);
1561         char buf[XSEG_MAX_TARGETLEN+1];
1562         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1563         int i;
1564         for (i = 0; i < reply->cnt; i++) {
1565                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1566                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1567                 buf[reply->segs[i].targetlen] = 0;
1568                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1569                                 (unsigned long long) reply->segs[i].offset,
1570                                 (unsigned long long) reply->segs[i].size);
1571         }
1572         return 0;
1573 }
1574
1575 //here map is the parent map
1576 static int do_clone(struct peer_req *pr, struct map *map)
1577 {
1578         /*
1579         FIXME check if clone map exists
1580         clonemap = get_map(pr, target, targetlen, MF_LOAD);
1581         if (clonemap)
1582                 do_dropcache(pr, clonemap); // drop map here, rely on get_map_function to drop
1583                                         //      cache on non-exclusive opens or declare a NO_CACHE flag ?
1584                 return -1;
1585         */
1586
1587         int r;
1588         char buf[XSEG_MAX_TARGETLEN];
1589         struct peerd *peer = pr->peer;
1590         struct mapperd *mapper = __get_mapperd(peer);
1591         char *target = xseg_get_target(peer->xseg, pr->req);
1592         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1593         XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1594         struct map *clonemap = create_map(mapper, target, pr->req->targetlen);
1595         if (!clonemap) 
1596                 return -1;
1597
1598         if (xclone->size == -1)
1599                 clonemap->size = map->size;
1600         else
1601                 clonemap->size = xclone->size;
1602         if (clonemap->size < map->size){
1603                 target = xseg_get_target(peer->xseg, pr->req);
1604                 strncpy(buf, target, pr->req->targetlen);
1605                 buf[pr->req->targetlen] = 0;
1606                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1607                                 "\n\t for requested clone %s",
1608                                 (unsigned long long) xclone->size,
1609                                 (unsigned long long) map->size, buf);
1610                 goto out_err;
1611         }
1612         
1613         //alloc and init map_nodes
1614         unsigned long c = clonemap->size/block_size + 1;
1615         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1616         if (!map_nodes){
1617                 goto out_err;
1618         }
1619         int i;
1620         for (i = 0; i < clonemap->size/block_size + 1; i++) {
1621                 struct map_node *mn = get_mapnode(map, i);
1622                 if (mn) {
1623                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1624                         map_nodes[i].objectlen = mn->objectlen;
1625                         put_mapnode(mn);
1626                 } else {
1627                         strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
1628                         map_nodes[i].objectlen = strlen(zero_block);
1629                 }
1630                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1631                 map_nodes[i].flags = 0;
1632                 map_nodes[i].objectidx = i;
1633                 map_nodes[i].map = clonemap;
1634                 map_nodes[i].ref = 1;
1635                 map_nodes[i].waiters = 0;
1636                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1637                 r = insert_object(clonemap, &map_nodes[i]);
1638                 if (r < 0){
1639                         XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1640                         goto out_err;
1641                 }
1642         }
1643         r = write_map(pr, clonemap);
1644         if (r < 0){
1645                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1646                 goto out_err;
1647         }
1648         return 0;
1649
1650 out_err:
1651         put_map(clonemap);
1652         return -1;
1653 }
1654
1655 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1656 {
1657         int r, opened = 0;
1658         if (flags & MF_EXCLUSIVE){
1659                 r = open_map(pr, map, flags);
1660                 if (r < 0) {
1661                         if (flags & MF_FORCE){
1662                                 return -1;
1663                         }
1664                 } else {
1665                         opened = 1;
1666                 }
1667         }
1668         r = load_map(pr, map);
1669         if (r < 0 && opened){
1670                 close_map(pr, map);
1671         }
1672         return r;
1673 }
1674
1675 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1676 {
1677         int r;
1678         struct peerd *peer = pr->peer;
1679         struct mapperd *mapper = __get_mapperd(peer);
1680         struct map *map = find_map(mapper, name, namelen);
1681         if (!map){
1682                 if (flags & MF_LOAD){
1683                         map = create_map(mapper, name, namelen);
1684                         if (!map)
1685                                 return NULL;
1686                         r = open_load_map(pr, map, flags);
1687                         if (r < 0){
1688                                 do_dropcache(pr, map);
1689                                 return NULL;
1690                         }
1691                 } else {
1692                         return NULL;
1693                 }
1694         } else if (map->flags & MF_MAP_DESTROYED){
1695                 return NULL;
1696         }
1697         __get_map(map);
1698         return map;
1699
1700 }
1701
1702 static int map_action(int (action)(struct peer_req *pr, struct map *map),
1703                 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1704 {
1705         //struct peerd *peer = pr->peer;
1706         struct map *map;
1707 start:
1708         map = get_map(pr, name, namelen, flags);
1709         if (!map)
1710                 return -1;
1711         if (map->flags & MF_MAP_NOT_READY){
1712                 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1713                 put_map(map);
1714                 goto start;
1715         }
1716         int r = action(pr, map);
1717         //always drop cache if map not read exclusively
1718         if (!(map->flags & MF_MAP_EXCLUSIVE))
1719                 do_dropcache(pr, map);
1720         //maybe capture ref before and compare here?
1721         if (map->ref > 1){
1722                 signal_map(map);
1723         }
1724         put_map(map);
1725         return r;
1726 }
1727
1728 void * handle_info(struct peer_req *pr)
1729 {
1730         struct peerd *peer = pr->peer;
1731         char *target = xseg_get_target(peer->xseg, pr->req);
1732         int r = map_action(do_info, pr, target, pr->req->targetlen, MF_LOAD);
1733         if (r < 0)
1734                 fail(peer, pr);
1735         else
1736                 complete(peer, pr);
1737         ta--;
1738         return NULL;
1739 }
1740
1741 void * handle_clone(struct peer_req *pr)
1742 {
1743         int r;
1744         struct peerd *peer = pr->peer;
1745         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1746         if (!xclone) {
1747                 r = -1;
1748                 goto out;
1749         }
1750         if (xclone->targetlen){
1751                 r = map_action(do_clone, pr, xclone->target, xclone->targetlen, MF_LOAD);
1752         } else {
1753                 if (!xclone->size){
1754                         r = -1;
1755                 } else {
1756                         struct map *map;
1757                         char *target = xseg_get_target(peer->xseg, pr->req);
1758                         XSEGLOG2(&lc, I, "Creating volume");
1759                         map = get_map(pr, target, pr->req->targetlen, MF_LOAD);
1760                         if (map){
1761                                 XSEGLOG2(&lc, E, "Volume %s exists", map->volume);
1762                                 if (map->ref <= 2) //initial one + one ref from __get_map
1763                                         do_dropcache(pr, map); //we are the only ones usining this map. Drop the cache. 
1764                                 put_map(map); //matches get_map
1765                                 r = -1;
1766                                 goto out;
1767                         }
1768                         //create a new empty map of size
1769                         map = create_map(mapper, target, pr->req->targetlen);
1770                         if (!map){
1771                                 r = -1;
1772                                 goto out;
1773                         }
1774                         map->size = xclone->size;
1775                         //populate_map with zero objects;
1776                         uint64_t nr_objs = xclone->size / block_size;
1777                         if (xclone->size % block_size)
1778                                 nr_objs++;
1779                                 
1780                         struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1781                         if (!map_nodes){
1782                                 do_dropcache(pr, map); //Since we just created the map, dropping cache should be sufficient.
1783                                 r = -1;
1784                                 goto out;
1785                         }
1786                         uint64_t i;
1787                         for (i = 0; i < nr_objs; i++) {
1788                                 strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
1789                                 map_nodes[i].objectlen = strlen(zero_block);
1790                                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1791                                 map_nodes[i].flags = 0;
1792                                 map_nodes[i].objectidx = i;
1793                                 map_nodes[i].map = map;
1794                                 map_nodes[i].ref = 1;
1795                                 map_nodes[i].waiters = 0;
1796                                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1797                                 r = insert_object(map, &map_nodes[i]);
1798                                 if (r < 0){
1799                                         do_dropcache(pr, map);
1800                                         r = -1;
1801                                         goto out;
1802                                 }
1803                         }
1804                         r = write_map(pr, map);
1805                         if (r < 0){
1806                                 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1807                                 do_dropcache(pr, map);
1808                                 goto out;
1809                         }
1810                         XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1811                         r = 0;
1812                         do_dropcache(pr, map); //drop cache here for consistency
1813                 }
1814         }
1815 out:
1816         if (r < 0)
1817                 fail(peer, pr);
1818         else
1819                 complete(peer, pr);
1820         ta--;
1821         return NULL;
1822 }
1823
1824 void * handle_mapr(struct peer_req *pr)
1825 {
1826         struct peerd *peer = pr->peer;
1827         char *target = xseg_get_target(peer->xseg, pr->req);
1828         int r = map_action(do_mapr, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE);
1829         if (r < 0)
1830                 fail(peer, pr);
1831         else
1832                 complete(peer, pr);
1833         ta--;
1834         return NULL;
1835 }
1836
1837 void * handle_mapw(struct peer_req *pr)
1838 {
1839         struct peerd *peer = pr->peer;
1840         char *target = xseg_get_target(peer->xseg, pr->req);
1841         int r = map_action(do_mapw, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1842         if (r < 0)
1843                 fail(peer, pr);
1844         else
1845                 complete(peer, pr);
1846         XSEGLOG2(&lc, D, "Ta: %d", ta);
1847         ta--;
1848         return NULL;
1849 }
1850
1851 void * handle_destroy(struct peer_req *pr)
1852 {
1853         struct peerd *peer = pr->peer;
1854         char *target = xseg_get_target(peer->xseg, pr->req);
1855         int r = map_action(do_destroy, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1856         if (r < 0)
1857                 fail(peer, pr);
1858         else
1859                 complete(peer, pr);
1860         ta--;
1861         return NULL;
1862 }
1863
1864 void * handle_close(struct peer_req *pr)
1865 {
1866         struct peerd *peer = pr->peer;
1867         char *target = xseg_get_target(peer->xseg, pr->req);
1868         //here we do not want to load
1869         int r = map_action(do_close, pr, target, pr->req->targetlen, MF_EXCLUSIVE|MF_FORCE);
1870         if (r < 0)
1871                 fail(peer, pr);
1872         else
1873                 complete(peer, pr);
1874         ta--;
1875         return NULL;
1876 }
1877
1878 int dispatch_accepted(struct peerd *peer, struct peer_req *pr, 
1879                         struct xseg_request *req)
1880 {
1881         //struct mapperd *mapper = __get_mapperd(peer);
1882         struct mapper_io *mio = __get_mapper_io(pr);
1883         void *(*action)(struct peer_req *) = NULL;
1884
1885         mio->state = ACCEPTED;
1886         mio->err = 0;
1887         mio->cb = NULL;
1888         switch (pr->req->op) {
1889                 /* primary xseg operations of mapper */
1890                 case X_CLONE: action = handle_clone; break;
1891                 case X_MAPR: action = handle_mapr; break;
1892                 case X_MAPW: action = handle_mapw; break;
1893 //              case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1894                 case X_INFO: action = handle_info; break;
1895                 case X_DELETE: action = handle_destroy; break;
1896                 case X_CLOSE: action = handle_close; break;
1897                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1898         }
1899         if (action){
1900                 ta++;
1901                 mio->active = 1;
1902                 st_thread_create(action, pr, 0, 0);
1903         }
1904         return 0;
1905
1906 }
1907
1908 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1909                 enum dispatch_reason reason)
1910 {
1911         struct mapperd *mapper = __get_mapperd(peer);
1912         (void) mapper;
1913         struct mapper_io *mio = __get_mapper_io(pr);
1914         (void) mio;
1915
1916
1917         if (reason == dispatch_accept)
1918                 dispatch_accepted(peer, pr, req);
1919         else {
1920                 if (mio->cb){
1921                         mio->cb(pr, req);
1922                 } else { 
1923                         signal_pr(pr);
1924                 }
1925         }
1926         return 0;
1927 }
1928
1929 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1930 {
1931         int i;
1932         unsigned char buf[SHA256_DIGEST_SIZE];
1933         unsigned char *zero;
1934
1935         gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1936
1937         /* Version check should be the very first call because it
1938           makes sure that important subsystems are intialized. */
1939         gcry_check_version (NULL);
1940      
1941         /* Disable secure memory.  */
1942         gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1943      
1944         /* Tell Libgcrypt that initialization has completed. */
1945         gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1946
1947         /* calculate out magic sha hash value */
1948         gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1949
1950         /* calculate zero block */
1951         //FIXME check hash value
1952         zero = malloc(block_size);
1953         memset(zero, 0, block_size);
1954         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1955         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1956                 sprintf(zero_block + 2*i, "%02x", buf[i]);
1957         printf("%s \n", zero_block);
1958         free(zero);
1959
1960         //FIXME error checks
1961         struct mapperd *mapperd = malloc(sizeof(struct mapperd));
1962         peer->priv = mapperd;
1963         mapper = mapperd;
1964         mapper->hashmaps = xhash_new(3, STRING);
1965         
1966         for (i = 0; i < peer->nr_ops; i++) {
1967                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1968                 mio->copyups_nodes = xhash_new(3, INTEGER);
1969                 mio->copyups = 0;
1970                 mio->err = 0;
1971                 mio->active = 0;
1972                 peer->peer_reqs[i].priv = mio;
1973         }
1974
1975         for (i = 0; i < argc; i++) {
1976                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1977                         mapper->bportno = atoi(argv[i+1]);
1978                         i += 1;
1979                         continue;
1980                 }
1981                 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1982                         mapper->mbportno = atoi(argv[i+1]);
1983                         i += 1;
1984                         continue;
1985                 }
1986                 /* enforce only one thread */
1987                 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1988                         int t = atoi(argv[i+1]);
1989                         if (t != 1) {
1990                                 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1991                                 return -1;
1992                         }
1993                         i += 1;
1994                         continue;
1995                 }
1996         }
1997
1998         const struct sched_param param = { .sched_priority = 99 };
1999         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
2000
2001
2002 //      test_map(peer);
2003
2004         return 0;
2005 }
2006
2007 void print_obj(struct map_node *mn)
2008 {
2009         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2010                         (unsigned long long) mn->objectidx, mn->object, 
2011                         (unsigned int) mn->objectlen, 
2012                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2013 }
2014
2015 void print_map(struct map *m)
2016 {
2017         uint64_t nr_objs = m->size/block_size;
2018         if (m->size % block_size)
2019                 nr_objs++;
2020         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n", 
2021                         m->volume, m->volumelen, 
2022                         (unsigned long long) m->size, 
2023                         (unsigned long long) nr_objs);
2024         uint64_t i;
2025         struct map_node *mn;
2026         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2027                 return;
2028         for (i = 0; i < nr_objs; i++) {
2029                 mn = find_object(m, i);
2030                 if (!mn){
2031                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
2032                         continue;
2033                 }
2034                 print_obj(mn);
2035         }
2036 }
2037
2038 /*
2039 void test_map(struct peerd *peer)
2040 {
2041         int i,j, ret;
2042         //struct sha256_ctx sha256ctx;
2043         unsigned char buf[SHA256_DIGEST_SIZE];
2044         char buf_new[XSEG_MAX_TARGETLEN + 20];
2045         struct map *m = malloc(sizeof(struct map));
2046         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2047         m->volume[XSEG_MAX_TARGETLEN] = 0;
2048         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2049         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2050         m->volumelen = XSEG_MAX_TARGETLEN;
2051         m->size = 100*block_size;
2052         m->objects = xhash_new(3, INTEGER);
2053         struct map_node *map_node = calloc(100, sizeof(struct map_node));
2054         for (i = 0; i < 100; i++) {
2055                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2056                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2057                 
2058                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2059                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2060                 }
2061                 map_node[i].objectidx = i;
2062                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2063                 map_node[i].flags = MF_OBJECT_EXIST;
2064                 ret = insert_object(m, &map_node[i]);
2065         }
2066
2067         char *data = malloc(block_size);
2068         mapheader_to_map(m, data);
2069         uint64_t pos = mapheader_size;
2070
2071         for (i = 0; i < 100; i++) {
2072                 map_node = find_object(m, i);
2073                 if (!map_node){
2074                         printf("no object node %d \n", i);
2075                         exit(1);
2076                 }
2077                 object_to_map(data+pos, map_node);
2078                 pos += objectsize_in_map;
2079         }
2080 //      print_map(m);
2081
2082         struct map *m2 = malloc(sizeof(struct map));
2083         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2084         m->volume[XSEG_MAX_TARGETLEN] = 0;
2085         m->volumelen = XSEG_MAX_TARGETLEN;
2086
2087         m2->objects = xhash_new(3, INTEGER);
2088         ret = read_map(peer, m2, data);
2089 //      print_map(m2);
2090
2091         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2092         ssize_t r, sum = 0;
2093         while (sum < block_size) {
2094                 r = write(fd, data + sum, block_size -sum);
2095                 if (r < 0){
2096                         perror("write");
2097                         printf("write error\n");
2098                         exit(1);
2099                 } 
2100                 sum += r;
2101         }
2102         close(fd);
2103         map_node = find_object(m, 0);
2104         free(map_node);
2105         free(m);
2106 }
2107 */