better printing on heap and lock status
[archipelago] / xseg / peers / user / mt-mapperd.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <pthread.h>
5 #include <xseg/xseg.h>
6 #include <peer.h>
7 #include <time.h>
8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <gcrypt.h>
14 #include <errno.h>
15 #include <sched.h>
16 #include <sys/syscall.h>
17
18 GCRY_THREAD_OPTION_PTHREAD_IMPL;
19 #define MF_LOAD         (1 << 0)
20 #define MF_EXCLUSIVE    (1 << 1)
21 #define MF_FORCE        (1 << 2)
22
23 #define SHA256_DIGEST_SIZE 32
24 /* hex representation of sha256 value takes up double the sha256 size */
25 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
26
27 #define block_size (1<<22) //FIXME this should be defined here?
28 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
29 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value  + volume size */
30
31 #define MF_OBJECT_EXIST         (1 << 0)
32 #define MF_OBJECT_COPYING       (1 << 1)
33 #define MF_OBJECT_WRITING       (1 << 2)
34 #define MF_OBJECT_DELETING      (1 << 3)
35 #define MF_OBJECT_DELETED       (1 << 4)
36 #define MF_OBJECT_DESTROYED     (1 << 5)
37
38 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
39
40 char *magic_string = "This a magic string. Please hash me";
41 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
42 char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
43
44 //dispatch_internal mapper states
45 enum mapper_state {
46         ACCEPTED = 0,
47         WRITING = 1,
48         COPYING = 2,
49         DELETING = 3,
50         DROPPING_CACHE = 4
51 };
52
53 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
54
55 struct map_node {
56         uint32_t flags;
57         uint32_t objectidx;
58         uint32_t objectlen;
59         char object[XSEG_MAX_TARGETLEN + 1];    /* NULL terminated string */
60         struct map *map;
61         uint32_t ref;
62         uint32_t waiters;
63         st_cond_t cond;
64 };
65
66 #define MF_MAP_LOADING          (1 << 0)
67 #define MF_MAP_DESTROYED        (1 << 1)
68 #define MF_MAP_WRITING          (1 << 2)
69 #define MF_MAP_DELETING         (1 << 3)
70 #define MF_MAP_DROPPING_CACHE   (1 << 4)
71 #define MF_MAP_EXCLUSIVE        (1 << 5)
72 #define MF_MAP_OPENING          (1 << 6)
73 #define MF_MAP_CLOSING          (1 << 7)
74
75 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
76                                         MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
77
78 #define wait_on_pr(__pr, __condition__)         \
79         while (__condition__){                  \
80                 ta--;                           \
81                 __get_mapper_io(pr)->active = 0;\
82                 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
83                 st_cond_wait(__pr->cond);       \
84         }
85
86 #define wait_on_mapnode(__mn, __condition__)    \
87         while (__condition__){                  \
88                 ta--;                           \
89                 __mn->waiters++;                \
90                 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
91                 st_cond_wait(__mn->cond);       \
92         }
93
94 #define wait_on_map(__map, __condition__)       \
95         while (__condition__){                  \
96                 ta--;                           \
97                 __map->waiters++;               \
98                 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
99                 st_cond_wait(__map->cond);      \
100         }
101
102 #define signal_pr(__pr)                         \
103         do {                                    \
104                 if (!__get_mapper_io(pr)->active){\
105                         ta++;                   \
106                         XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta); \
107                         __get_mapper_io(pr)->active = 1;\
108                         st_cond_signal(__pr->cond);     \
109                 }                               \
110         }while(0)
111
112 #define signal_map(__map)                       \
113         do {                                    \
114                 if (__map->waiters) {           \
115                         ta += 1;                \
116                         XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
117                         __map->waiters--;       \
118                         st_cond_signal(__map->cond);    \
119                 }                               \
120         }while(0)
121
122 #define signal_mapnode(__mn)                    \
123         do {                                    \
124                 if (__mn->waiters) {            \
125                         ta += __mn->waiters;    \
126                         XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
127                         __mn->waiters = 0;      \
128                         st_cond_broadcast(__mn->cond);  \
129                 }                               \
130         }while(0)
131
132
133 struct map {
134         uint32_t flags;
135         uint64_t size;
136         uint32_t volumelen;
137         char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
138         xhash_t *objects;       /* obj_index --> map_node */
139         uint32_t ref;
140         uint32_t waiters;
141         st_cond_t cond;
142 };
143
144 struct mapperd {
145         xport bportno;          /* blocker that accesses data */
146         xport mbportno;         /* blocker that accesses maps */
147         xhash_t *hashmaps; // hash_function(target) --> struct map
148 };
149
150 struct mapper_io {
151         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
152         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
153         struct map_node *copyup_node;
154         volatile int err;                       /* error flag */
155         volatile uint64_t del_pending;
156         uint64_t delobj;
157         uint64_t dcobj;
158         cb_t cb;
159         enum mapper_state state;
160         volatile int active;
161 };
162
163 /* global vars */
164 struct mapperd *mapper;
165
166 void print_map(struct map *m);
167
168 /*
169  * Helper functions
170  */
171
172 static inline struct mapperd * __get_mapperd(struct peerd *peer)
173 {
174         return (struct mapperd *) peer->priv;
175 }
176
177 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
178 {
179         return (struct mapper_io *) pr->priv;
180 }
181
182 static inline uint64_t calc_map_obj(struct map *map)
183 {
184         if (map->size == -1)
185                 return 0;
186         uint64_t nr_objs = map->size / block_size;
187         if (map->size % block_size)
188                 nr_objs++;
189         return nr_objs;
190 }
191
192 static uint32_t calc_nr_obj(struct xseg_request *req)
193 {
194         unsigned int r = 1;
195         uint64_t rem_size = req->size;
196         uint64_t obj_offset = req->offset & (block_size -1); //modulo
197         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
198         rem_size -= obj_size;
199         while (rem_size > 0) {
200                 obj_size = (rem_size > block_size) ? block_size : rem_size;
201                 rem_size -= obj_size;
202                 r++;
203         }
204
205         return r;
206 }
207
208 /*
209  * Maps handling functions
210  */
211 //FIXME assert targetlen > 0
212
213
214 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
215 {
216         int r;
217         struct map *m = NULL;
218         char buf[XSEG_MAX_TARGETLEN+1];
219         //assert targetlen <= XSEG_MAX_TARGETLEN
220         strncpy(buf, target, targetlen);
221         buf[targetlen] = 0;
222         XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
223         r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
224         if (r < 0)
225                 return NULL;
226         return m;
227 }
228
229
230 static int insert_map(struct mapperd *mapper, struct map *map)
231 {
232         int r = -1;
233         
234         if (find_map(mapper, map->volume, map->volumelen)){
235                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
236                 goto out;
237         }
238
239         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
240                         map->volume, strlen(map->volume), (unsigned long) map);
241         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
242         while (r == -XHASH_ERESIZE) {
243                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
244                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
245                 if (!new_hashmap){
246                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
247                                         (unsigned long long) shift);
248                         goto out;
249                 }
250                 mapper->hashmaps = new_hashmap;
251                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
252         }
253 out:
254         return r;
255 }
256
257 static int remove_map(struct mapperd *mapper, struct map *map)
258 {
259         int r = -1;
260         
261         //assert no pending pr on map
262         
263         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
264         while (r == -XHASH_ERESIZE) {
265                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
266                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
267                 if (!new_hashmap){
268                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
269                                         (unsigned long long) shift);
270                         goto out;
271                 }
272                 mapper->hashmaps = new_hashmap;
273                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
274         }
275 out:
276         return r;
277 }
278
279 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
280 {
281         int r;
282         xport p;
283         struct peerd *peer = pr->peer;
284         struct xseg_request *req;
285         struct mapperd *mapper = __get_mapperd(peer);
286         void *dummy;
287
288         XSEGLOG2(&lc, I, "Closing map %s", map->volume);
289
290         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
291         if (!req){
292                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
293                                 map->volume);
294                 goto out_err;
295         }
296
297         r = xseg_prep_request(peer->xseg, req, map->volumelen, block_size);
298         if (r < 0){
299                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
300                                 map->volume);
301                 goto out_put;
302         }
303
304         char *reqtarget = xseg_get_target(peer->xseg, req);
305         if (!reqtarget)
306                 goto out_put;
307         strncpy(reqtarget, map->volume, req->targetlen);
308         req->op = X_CLOSE;
309         req->size = block_size;
310         req->offset = 0;
311         r = xseg_set_req_data(peer->xseg, req, pr);
312         if (r < 0){
313                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
314                                 map->volume);
315                 goto out_put;
316         }
317         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
318         if (p == NoPort){ 
319                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
320                                 map->volume);
321                 goto out_unset;
322         }
323         r = xseg_signal(peer->xseg, p);
324         
325         XSEGLOG2(&lc, I, "Map %s closing", map->volume);
326         return req;
327
328 out_unset:
329         xseg_get_req_data(peer->xseg, req, &dummy);
330 out_put:
331         xseg_put_request(peer->xseg, req, pr->portno);
332 out_err:
333         return NULL;
334 }
335
336 static int close_map(struct peer_req *pr, struct map *map)
337 {
338         int err;
339         struct xseg_request *req;
340         struct peerd *peer = pr->peer;
341
342         map->flags |= MF_MAP_CLOSING;
343         req = __close_map(pr, map);
344         if (!req)
345                 return -1;
346         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
347         map->flags &= ~MF_MAP_CLOSING;
348         err = req->state & XS_FAILED;
349         xseg_put_request(peer->xseg, req, pr->portno);
350         if (err)
351                 return -1;
352         return 0;
353 }
354
355 /*
356 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
357                                 char *target, uint32_t targetlen, struct map **m)
358 {
359         struct mapperd *mapper = __get_mapperd(peer);
360         int r;
361         *m = find_map(mapper, target, targetlen);
362         if (*m) {
363                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
364                 if ((*m)->flags & MF_MAP_NOT_READY) {
365                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
366                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
367                         return MF_PENDING;
368                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
369                 //      return -1;
370                 // 
371                 }else {
372                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
373                         return 0;
374                 }
375         }
376         r = open_map(peer, pr, target, targetlen, 0);
377         if (r < 0)
378                 return -1; //error
379         return MF_PENDING;      
380 }
381 */
382 /*
383  * Object handling functions
384  */
385
386 struct map_node *find_object(struct map *map, uint64_t obj_index)
387 {
388         struct map_node *mn;
389         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
390         if (r < 0)
391                 return NULL;
392         return mn;
393 }
394
395 static int insert_object(struct map *map, struct map_node *mn)
396 {
397         //FIXME no find object first
398         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
399         if (r == -XHASH_ERESIZE) {
400                 unsigned long shift = xhash_grow_size_shift(map->objects);
401                 map->objects = xhash_resize(map->objects, shift, NULL);
402                 if (!map->objects)
403                         return -1;
404                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
405         }
406         return r;
407 }
408
409
410 /*
411  * map read/write functions 
412  */
413 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
414 {
415         int i;
416         //hexlify sha256 value
417         for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
418                 sprintf(mn->object+2*i, "%02x", buf[i]);
419         }
420
421         mn->object[SHA256_DIGEST_SIZE * 2] = 0;
422         mn->objectlen = SHA256_DIGEST_SIZE * 2;
423         mn->flags = MF_OBJECT_EXIST;
424 }
425
426 static inline void map_to_object(struct map_node *mn, char *buf)
427 {
428         char c = buf[0];
429         mn->flags = 0;
430         if (c)
431                 mn->flags |= MF_OBJECT_EXIST;
432         memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
433         mn->object[XSEG_MAX_TARGETLEN] = 0;
434         mn->objectlen = strlen(mn->object);
435 }
436
437 static inline void object_to_map(char* buf, struct map_node *mn)
438 {
439         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
440         memcpy(buf+1, mn->object, mn->objectlen);
441         memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
442 }
443
444 static inline void mapheader_to_map(struct map *m, char *buf)
445 {
446         uint64_t pos = 0;
447         memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
448         pos += SHA256_DIGEST_SIZE;
449         memcpy(buf + pos, &m->size, sizeof(m->size));
450         pos += sizeof(m->size);
451 }
452
453
454 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr, 
455                                 struct map *map, struct map_node *mn)
456 {
457         void *dummy;
458         struct mapperd *mapper = __get_mapperd(peer);
459         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
460                                                         mapper->mbportno, X_ALLOC);
461         if (!req){
462                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
463                                 "(Map: %s [%llu]",
464                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
465                 goto out_err;
466         }
467         int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
468         if (r < 0){
469                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
470                                 "(Map: %s [%llu]",
471                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
472                 goto out_put;
473         }
474         char *target = xseg_get_target(peer->xseg, req);
475         strncpy(target, map->volume, req->targetlen);
476         req->size = objectsize_in_map;
477         req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
478         req->op = X_WRITE;
479         char *data = xseg_get_data(peer->xseg, req);
480         object_to_map(data, mn);
481
482         r = xseg_set_req_data(peer->xseg, req, pr);
483         if (r < 0){
484                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
485                                 "(Map: %s [%llu]",
486                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
487                 goto out_put;
488         }
489         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
490         if (p == NoPort){
491                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
492                                 "(Map: %s [%llu]",
493                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
494                 goto out_unset;
495         }
496         r = xseg_signal(peer->xseg, p);
497         if (r < 0)
498                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
499
500         XSEGLOG2(&lc, I, "Writing object %s \n\t"
501                         "Map: %s [%llu]",
502                         mn->object, map->volume, (unsigned long long) mn->objectidx);
503
504         return req;
505
506 out_unset:
507         xseg_get_req_data(peer->xseg, req, &dummy);
508 out_put:
509         xseg_put_request(peer->xseg, req, pr->portno);
510 out_err:
511         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
512                         "(Map: %s [%llu]",
513                         mn->object, map->volume, (unsigned long long) mn->objectidx);
514         return NULL;
515 }
516
517 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
518 {
519         void *dummy;
520         struct peerd *peer = pr->peer;
521         struct mapperd *mapper = __get_mapperd(peer);
522         struct map_node *mn;
523         uint64_t i, pos, max_objidx = calc_map_obj(map);
524         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
525                                                         mapper->mbportno, X_ALLOC);
526         if (!req){
527                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
528                 goto out_err;
529         }
530         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 
531                                         mapheader_size + max_objidx * objectsize_in_map);
532         if (r < 0){
533                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
534                 goto out_put;
535         }
536         char *target = xseg_get_target(peer->xseg, req);
537         strncpy(target, map->volume, req->targetlen);
538         char *data = xseg_get_data(peer->xseg, req);
539         mapheader_to_map(map, data);
540         pos = mapheader_size;
541         req->op = X_WRITE;
542         req->size = req->datalen;
543         req->offset = 0;
544
545         if (map->size % block_size)
546                 max_objidx++;
547         for (i = 0; i < max_objidx; i++) {
548                 mn = find_object(map, i);
549                 if (!mn){
550                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
551                                         (unsigned long long) i, map->volume);
552                         goto out_put;
553                 }
554                 object_to_map(data+pos, mn);
555                 pos += objectsize_in_map;
556         }
557         r = xseg_set_req_data(peer->xseg, req, pr);
558         if (r < 0){
559                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
560                                 map->volume);
561                 goto out_put;
562         }
563         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
564         if (p == NoPort){
565                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
566                                 map->volume);
567                 goto out_unset;
568         }
569         r = xseg_signal(peer->xseg, p);
570         if (r < 0)
571                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
572
573         map->flags |= MF_MAP_WRITING;
574         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
575         return req;
576
577 out_unset:
578         xseg_get_req_data(peer->xseg, req, &dummy);
579 out_put:
580         xseg_put_request(peer->xseg, req, pr->portno);
581 out_err:
582         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
583         return NULL;
584 }
585
586 static int write_map(struct peer_req* pr, struct map *map)
587 {
588         int r = 0;
589         struct peerd *peer = pr->peer;
590         map->flags |= MF_MAP_WRITING;
591         struct xseg_request *req = __write_map(pr, map);
592         if (!req)
593                 return -1;
594         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
595         if (req->state & XS_FAILED)
596                 r = -1;
597         xseg_put_request(peer->xseg, req, pr->portno);
598         map->flags &= ~MF_MAP_WRITING;
599         return r;
600 }
601
602 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
603 {
604         int r;
605         xport p;
606         struct xseg_request *req;
607         struct peerd *peer = pr->peer;
608         struct mapperd *mapper = __get_mapperd(peer);
609         void *dummy;
610
611         XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
612
613         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
614         if (!req){
615                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
616                                 m->volume);
617                 goto out_fail;
618         }
619
620         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
621         if (r < 0){
622                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
623                                 m->volume);
624                 goto out_put;
625         }
626
627         char *reqtarget = xseg_get_target(peer->xseg, req);
628         if (!reqtarget)
629                 goto out_put;
630         strncpy(reqtarget, m->volume, req->targetlen);
631         req->op = X_READ;
632         req->size = block_size;
633         req->offset = 0;
634         r = xseg_set_req_data(peer->xseg, req, pr);
635         if (r < 0){
636                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
637                                 m->volume);
638                 goto out_put;
639         }
640         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
641         if (p == NoPort){ 
642                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
643                                 m->volume);
644                 goto out_unset;
645         }
646         r = xseg_signal(peer->xseg, p);
647         
648         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
649         return req;
650
651 out_unset:
652         xseg_get_req_data(peer->xseg, req, &dummy);
653 out_put:
654         xseg_put_request(peer->xseg, req, pr->portno);
655 out_fail:
656         return NULL;
657 }
658
659 static int read_map (struct map *map, char *buf)
660 {
661         char nulls[SHA256_DIGEST_SIZE];
662         memset(nulls, 0, SHA256_DIGEST_SIZE);
663
664         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
665         if (r) {
666                 XSEGLOG2(&lc, D, "Read zeros");
667                 //read error;
668                 return -1;
669         }
670         //type 1, our type, type 0 pithos map
671         int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
672         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
673         uint64_t pos;
674         uint64_t i, nr_objs;
675         struct map_node *map_node;
676         if (type) {
677                 pos = SHA256_DIGEST_SIZE;
678                 map->size = *(uint64_t *) (buf + pos);
679                 pos += sizeof(uint64_t);
680                 nr_objs = map->size / block_size;
681                 if (map->size % block_size)
682                         nr_objs++;
683                 map_node = calloc(nr_objs, sizeof(struct map_node));
684                 if (!map_node)
685                         return -1;
686
687                 for (i = 0; i < nr_objs; i++) {
688                         map_node[i].map = map;
689                         map_node[i].objectidx = i;
690                         map_node[i].waiters = 0;
691                         map_node[i].ref = 1;
692                         map_node[i].cond = st_cond_new(); //FIXME err check;
693                         map_to_object(&map_node[i], buf + pos);
694                         pos += objectsize_in_map;
695                         r = insert_object(map, &map_node[i]); //FIXME error check
696                 }
697         } else {
698                 pos = 0;
699                 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
700                 map_node = calloc(max_nr_objs, sizeof(struct map_node));
701                 if (!map_node)
702                         return -1;
703                 for (i = 0; i < max_nr_objs; i++) {
704                         if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
705                                 break;
706                         map_node[i].objectidx = i;
707                         map_node[i].map = map;
708                         map_node[i].waiters = 0;
709                         map_node[i].ref = 1;
710                         map_node[i].cond = st_cond_new(); //FIXME err check;
711                         pithosmap_to_object(&map_node[i], buf + pos);
712                         pos += SHA256_DIGEST_SIZE; 
713                         r = insert_object(map, &map_node[i]); //FIXME error check
714                 }
715                 map->size = i * block_size; 
716         }
717         print_map(map);
718         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
719         return 0;
720
721         //FIXME cleanup on error
722 }
723
724 static int load_map(struct peer_req *pr, struct map *map)
725 {
726         int r = 0;
727         struct xseg_request *req;
728         struct peerd *peer = pr->peer;
729         map->flags |= MF_MAP_LOADING;
730         req = __load_map(pr, map);
731         if (!req)
732                 return -1;
733         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
734         map->flags &= ~MF_MAP_LOADING;
735         if (req->state & XS_FAILED){
736                 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
737                 xseg_put_request(peer->xseg, req, pr->portno);
738                 return -1;
739         }
740         r = read_map(map, xseg_get_data(peer->xseg, req));
741         xseg_put_request(peer->xseg, req, pr->portno);
742         return r;
743 }
744
745 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m)
746 {
747         int r;
748         xport p;
749         struct xseg_request *req;
750         struct peerd *peer = pr->peer;
751         struct mapperd *mapper = __get_mapperd(peer);
752         void *dummy;
753
754         XSEGLOG2(&lc, I, "Opening map %s", m->volume);
755
756         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
757         if (!req){
758                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
759                                 m->volume);
760                 goto out_fail;
761         }
762
763         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
764         if (r < 0){
765                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
766                                 m->volume);
767                 goto out_put;
768         }
769
770         char *reqtarget = xseg_get_target(peer->xseg, req);
771         if (!reqtarget)
772                 goto out_put;
773         strncpy(reqtarget, m->volume, req->targetlen);
774         req->op = X_OPEN;
775         req->size = block_size;
776         req->offset = 0;
777         r = xseg_set_req_data(peer->xseg, req, pr);
778         if (r < 0){
779                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
780                                 m->volume);
781                 goto out_put;
782         }
783         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
784         if (p == NoPort){ 
785                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
786                                 m->volume);
787                 goto out_unset;
788         }
789         r = xseg_signal(peer->xseg, p);
790         
791         XSEGLOG2(&lc, I, "Map %s opening", m->volume);
792         return req;
793
794 out_unset:
795         xseg_get_req_data(peer->xseg, req, &dummy);
796 out_put:
797         xseg_put_request(peer->xseg, req, pr->portno);
798 out_fail:
799         return NULL;
800 }
801
802 static int open_map(struct peer_req *pr, struct map *map)
803 {
804         int err;
805         struct xseg_request *req;
806         struct peerd *peer = pr->peer;
807
808         map->flags |= MF_MAP_OPENING;
809         req = __open_map(pr, map);
810         if (!req)
811                 return -1;
812         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
813         map->flags &= ~MF_MAP_OPENING;
814         err = req->state & XS_FAILED;
815         xseg_put_request(peer->xseg, req, pr->portno);
816         if (err)
817                 return -1;
818         else 
819                 map->flags |= MF_MAP_EXCLUSIVE;
820         return 0;
821 }
822
823 /*
824  * copy up functions
825  */
826
827 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
828 {
829         int r = 0;
830         if (mn){
831                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
832                 if (r == -XHASH_ERESIZE) {
833                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
834                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
835                         if (!new_hashmap)
836                                 goto out;
837                         mio->copyups_nodes = new_hashmap;
838                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
839                 }
840         }
841         else {
842                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
843                 if (r == -XHASH_ERESIZE) {
844                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
845                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
846                         if (!new_hashmap)
847                                 goto out;
848                         mio->copyups_nodes = new_hashmap;
849                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
850                 }
851         }
852 out:
853         return r;
854 }
855
856 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
857 {
858         struct map_node *mn;
859         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
860         if (r < 0)
861                 return NULL;
862         return mn;
863 }
864
865 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
866 {
867         struct mapperd *mapper = __get_mapperd(peer);
868         struct mapper_io *mio = __get_mapper_io(pr);
869         struct map *map = mn->map;
870         void *dummy;
871         int r = -1, i;
872         xport p;
873
874         //struct sha256_ctx sha256ctx;
875         uint32_t newtargetlen;
876         char new_target[XSEG_MAX_TARGETLEN + 1]; 
877         unsigned char buf[SHA256_DIGEST_SIZE];  //assert sha256_digest_size(32) <= MAXTARGETLEN 
878         char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
879         strncpy(new_object, map->volume, map->volumelen);
880         sprintf(new_object + map->volumelen, "%u", mn->objectidx); //sprintf adds null termination
881         new_object[XSEG_MAX_TARGETLEN + 19] = 0;
882
883         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
884         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
885                 sprintf (new_target + 2*i, "%02x", buf[i]);
886         newtargetlen = SHA256_DIGEST_SIZE  * 2;
887
888         if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE)) 
889                 goto copyup_zeroblock;
890
891         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
892                                                         mapper->bportno, X_ALLOC);
893         if (!req){
894                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
895                 goto out_err;
896         }
897         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
898                                 sizeof(struct xseg_request_copy));
899         if (r < 0){
900                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
901                 goto out_put;
902         }
903
904         char *target = xseg_get_target(peer->xseg, req);
905         strncpy(target, new_target, req->targetlen);
906
907         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
908         strncpy(xcopy->target, mn->object, mn->objectlen);
909         xcopy->targetlen = mn->objectlen;
910
911         req->offset = 0;
912         req->size = block_size;
913         req->op = X_COPY;
914         r = xseg_set_req_data(peer->xseg, req, pr);
915         if (r<0){
916                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
917                 goto out_put;
918         }
919         r = __set_copyup_node(mio, req, mn);
920         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
921         if (p == NoPort) {
922                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
923                 goto out_unset;
924         }
925         xseg_signal(peer->xseg, p);
926 //      mio->copyups++;
927
928         mn->flags |= MF_OBJECT_COPYING;
929         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
930         return req;
931
932 out_unset:
933         r = __set_copyup_node(mio, req, NULL);
934         xseg_get_req_data(peer->xseg, req, &dummy);
935 out_put:
936         xseg_put_request(peer->xseg, req, pr->portno);
937 out_err:
938         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
939         return NULL;
940
941 copyup_zeroblock:
942         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
943                         "Proceeding in writing the new object in map");
944         /* construct a tmp map_node for writing purposes */
945         struct map_node newmn = *mn;
946         newmn.flags = MF_OBJECT_EXIST;
947         strncpy(newmn.object, new_target, newtargetlen);
948         newmn.object[newtargetlen] = 0;
949         newmn.objectlen = newtargetlen;
950         newmn.objectidx = mn->objectidx; 
951         req = object_write(peer, pr, map, &newmn);
952         r = __set_copyup_node(mio, req, mn);
953         if (!req){
954                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
955                                 "\n\t of map %s [%llu]",
956                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
957                 return NULL;
958         }
959         mn->flags |= MF_OBJECT_WRITING;
960         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
961         return req;
962 }
963
964 static struct xseg_request * delete_object(struct peer_req *pr, struct map_node *mn)
965 {
966         void *dummy;
967         struct peerd *peer = pr->peer;
968         struct mapperd *mapper = __get_mapperd(peer);
969         struct mapper_io *mio = __get_mapper_io(pr);
970         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
971                                                         mapper->bportno, X_ALLOC);
972         XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
973         if (!req){
974                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
975                 goto out_err;
976         }
977         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
978         if (r < 0){
979                 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
980                 goto out_put;
981         }
982         char *target = xseg_get_target(peer->xseg, req);
983         strncpy(target, mn->object, req->targetlen);
984         req->op = X_DELETE;
985         req->size = req->datalen;
986         req->offset = 0;
987         r = xseg_set_req_data(peer->xseg, req, pr);
988         if (r < 0){
989                 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
990                 goto out_put;
991         }
992         __set_copyup_node(mio, req, mn);
993         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
994         if (p == NoPort){
995                 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
996                 goto out_unset;
997         }
998         r = xseg_signal(peer->xseg, p);
999         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1000         return req;
1001
1002 out_unset:
1003         xseg_get_req_data(peer->xseg, req, &dummy);
1004 out_put:
1005         xseg_put_request(peer->xseg, req, pr->portno);
1006 out_err:
1007         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1008         return NULL;
1009 }
1010
1011 static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
1012 {
1013         void *dummy;
1014         struct peerd *peer = pr->peer;
1015         struct mapperd *mapper = __get_mapperd(peer);
1016         struct mapper_io *mio = __get_mapper_io(pr);
1017         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1018                                                         mapper->mbportno, X_ALLOC);
1019         XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1020         if (!req){
1021                 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1022                 goto out_err;
1023         }
1024         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1025         if (r < 0){
1026                 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1027                 goto out_put;
1028         }
1029         char *target = xseg_get_target(peer->xseg, req);
1030         strncpy(target, map->volume, req->targetlen);
1031         req->op = X_DELETE;
1032         req->size = req->datalen;
1033         req->offset = 0;
1034         r = xseg_set_req_data(peer->xseg, req, pr);
1035         if (r < 0){
1036                 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1037                 goto out_put;
1038         }
1039         __set_copyup_node(mio, req, NULL);
1040         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1041         if (p == NoPort){
1042                 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1043                 goto out_unset;
1044         }
1045         r = xseg_signal(peer->xseg, p);
1046         map->flags |= MF_MAP_DELETING;
1047         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1048         return req;
1049
1050 out_unset:
1051         xseg_get_req_data(peer->xseg, req, &dummy);
1052 out_put:
1053         xseg_put_request(peer->xseg, req, pr->portno);
1054 out_err:
1055         XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1056         return  NULL;
1057 }
1058
1059
1060 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1061 {
1062         struct map_node *mn = find_object(map, index);
1063         if (mn)
1064                 mn->ref++;
1065         return mn;
1066 }
1067
1068 static inline void put_mapnode(struct map_node *mn)
1069 {
1070         mn->ref--;
1071         if (!mn->ref){
1072                 //clean up mn
1073                 st_cond_destroy(mn->cond);
1074         }
1075 }
1076
1077 static inline void __get_map(struct map *map)
1078 {
1079         map->ref++;
1080 }
1081
1082 static inline void put_map(struct map *map)
1083 {
1084         struct map_node *mn;
1085         map->ref--;
1086         if (!map->ref){
1087                 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1088                 //clean up map
1089                 uint64_t i;
1090                 for (i = 0; i < calc_map_obj(map); i++) {
1091                         mn = get_mapnode(map, i);
1092                         if (mn) {
1093                                 //make sure all pending operations on all objects are completed
1094                                 if (mn->flags & MF_OBJECT_NOT_READY){
1095                                         //this should never happen...
1096                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1097                                 }
1098                                 mn->flags &= MF_OBJECT_DESTROYED;
1099                                 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1100                                 put_mapnode(mn); //matcing get_mapnode;
1101                                 //assert mn->ref == 0;
1102                         }
1103                 }
1104                 mn = find_object(map, 0);
1105                 if (mn)
1106                         free(mn);
1107                 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1108                 free(map);
1109         }
1110 }
1111
1112 static struct map * create_map(struct mapperd *mapper, char *name, uint32_t namelen)
1113 {
1114         int r;
1115         struct map *m = malloc(sizeof(struct map));
1116         if (!m){
1117                 XSEGLOG2(&lc, E, "Cannot allocate map ");
1118                 goto out_err;
1119         }
1120         m->size = -1;
1121         strncpy(m->volume, name, namelen);
1122         m->volume[namelen] = 0;
1123         m->volumelen = namelen;
1124         m->flags = 0;
1125         m->objects = xhash_new(3, INTEGER); 
1126         if (!m->objects){
1127                 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1128                                 m->volume);
1129                 goto out_map;
1130         }
1131         m->ref = 1;
1132         m->waiters = 0;
1133         m->cond = st_cond_new(); //FIXME err check;
1134         r = insert_map(mapper, m);
1135         if (r < 0){
1136                 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1137                 goto out_hash;
1138         }
1139
1140         return m;
1141
1142 out_hash:
1143         xhash_free(m->objects);
1144 out_map:
1145         XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1146         free(m);
1147 out_err:
1148         return NULL;
1149 }
1150
1151
1152
1153 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1154 {
1155         struct peerd *peer = pr->peer;
1156         struct mapperd *mapper = __get_mapperd(peer);
1157         (void)mapper;
1158         struct mapper_io *mio = __get_mapper_io(pr);
1159         struct map_node *mn = __get_copyup_node(mio, req);
1160
1161         mio->del_pending--;
1162         if (req->state & XS_FAILED){
1163                 mio->err = 1;
1164         }
1165         signal_mapnode(mn);
1166         xseg_put_request(peer->xseg, req, pr->portno);
1167         signal_pr(pr);
1168 }
1169
1170 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1171 {
1172         struct peerd *peer = pr->peer;
1173         struct mapperd *mapper = __get_mapperd(peer);
1174         (void)mapper;
1175         struct mapper_io *mio = __get_mapper_io(pr);
1176         struct map_node *mn = __get_copyup_node(mio, req);
1177         if (!mn){
1178                 XSEGLOG2(&lc, E, "Cannot get map node");
1179                 goto out_err;
1180         }
1181         __set_copyup_node(mio, req, NULL);
1182
1183         if (req->state & XS_FAILED){
1184                 XSEGLOG2(&lc, E, "Req failed");
1185                 mn->flags &= ~MF_OBJECT_COPYING;
1186                 mn->flags &= ~MF_OBJECT_WRITING;
1187                 goto out_err;
1188         }
1189         if (req->op == X_WRITE) {
1190                 char *target = xseg_get_target(peer->xseg, req);
1191                 (void)target;
1192                 //printf("handle object write replyi\n");
1193                 __set_copyup_node(mio, req, NULL);
1194                 //assert mn->flags & MF_OBJECT_WRITING
1195                 mn->flags &= ~MF_OBJECT_WRITING;
1196                 
1197                 struct map_node tmp;
1198                 char *data = xseg_get_data(peer->xseg, req);
1199                 map_to_object(&tmp, data);
1200                 mn->flags |= MF_OBJECT_EXIST;
1201                 if (mn->flags != MF_OBJECT_EXIST){
1202                         XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1203                         goto out_err;
1204                 }
1205                 //assert mn->flags & MF_OBJECT_EXIST
1206                 strncpy(mn->object, tmp.object, tmp.objectlen);
1207                 mn->object[tmp.objectlen] = 0;
1208                 mn->objectlen = tmp.objectlen;
1209                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1210                 mio->copyups--;
1211                 signal_mapnode(mn);
1212         } else if (req->op == X_COPY) {
1213         //      issue write_object;
1214                 mn->flags &= ~MF_OBJECT_COPYING;
1215                 struct map *map = mn->map;
1216                 if (!map){
1217                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1218                         goto out_err;
1219                 }
1220
1221                 /* construct a tmp map_node for writing purposes */
1222                 char *target = xseg_get_target(peer->xseg, req);
1223                 struct map_node newmn = *mn;
1224                 newmn.flags = MF_OBJECT_EXIST;
1225                 strncpy(newmn.object, target, req->targetlen);
1226                 newmn.object[req->targetlen] = 0;
1227                 newmn.objectlen = req->targetlen;
1228                 newmn.objectidx = mn->objectidx; 
1229                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1230                 if (!xreq){
1231                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1232                                         "\n\t of map %s [%llu]",
1233                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1234                         goto out_err;
1235                 }
1236                 mn->flags |= MF_OBJECT_WRITING;
1237                 __set_copyup_node (mio, xreq, mn);
1238
1239                 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1240         } else {
1241                 //wtf??
1242                 ;
1243         }
1244
1245 out:
1246         xseg_put_request(peer->xseg, req, pr->portno);
1247         return;
1248
1249 out_err:
1250         mio->copyups--;
1251         XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1252         mio->err = 1;
1253         if (mn)
1254                 signal_mapnode(mn);
1255         goto out;
1256
1257 }
1258
1259 struct r2o {
1260         struct map_node *mn;
1261         uint64_t offset;
1262         uint64_t size;
1263 };
1264
1265 static int req2objs(struct peer_req *pr, struct map *map, int write)
1266 {
1267         int r = 0;
1268         struct peerd *peer = pr->peer;
1269         struct mapper_io *mio = __get_mapper_io(pr);
1270         char *target = xseg_get_target(peer->xseg, pr->req);
1271         uint32_t nr_objs = calc_nr_obj(pr->req);
1272         uint64_t size = sizeof(struct xseg_reply_map) + 
1273                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1274         uint32_t idx, i, ready;
1275         uint64_t rem_size, obj_index, obj_offset, obj_size; 
1276         struct map_node *mn;
1277         mio->copyups = 0;
1278         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1279
1280         /* get map_nodes of request */
1281         struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1282         if (!mns){
1283                 XSEGLOG2(&lc, E, "Cannot allocate mns");
1284                 return -1;
1285         }
1286         idx = 0;
1287         rem_size = pr->req->size;
1288         obj_index = pr->req->offset / block_size;
1289         obj_offset = pr->req->offset & (block_size -1); //modulo
1290         obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1291         mn = get_mapnode(map, obj_index);
1292         if (!mn) {
1293                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1294                 r = -1;
1295                 goto out;
1296         }
1297         mns[idx].mn = mn;
1298         mns[idx].offset = obj_offset;
1299         mns[idx].size = obj_size;
1300         rem_size -= obj_size;
1301         while (rem_size > 0) {
1302                 idx++;
1303                 obj_index++;
1304                 obj_offset = 0;
1305                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
1306                 rem_size -= obj_size;
1307                 mn = get_mapnode(map, obj_index);
1308                 if (!mn) {
1309                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1310                         r = -1;
1311                         goto out;
1312                 }
1313                 mns[idx].mn = mn;
1314                 mns[idx].offset = obj_offset;
1315                 mns[idx].size = obj_size;
1316         }
1317         if (write) {
1318                 ready = 0;
1319                 int can_wait = 0;
1320                 mio->cb=copyup_cb;
1321                 while (ready < (idx + 1)){
1322                         ready = 0;
1323                         for (i = 0; i < (idx+1); i++) {
1324                                 mn = mns[i].mn;
1325                                 //do copyups
1326                                 if (mn->flags & MF_OBJECT_NOT_READY) {
1327                                         if (can_wait){
1328                                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1329                                                 if (mn->flags & MF_OBJECT_DELETED){
1330                                                         mio->err = 1;
1331                                                 }
1332                                                 if (mio->err){
1333                                                         XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1334                                                         if (!mio->copyups){
1335                                                                 r = -1;
1336                                                                 goto out;
1337                                                         }
1338                                                 }
1339                                         }
1340                                 }
1341                                 else if (!(mn->flags & MF_OBJECT_EXIST)) {
1342                                         //calc new_target, copy up object
1343                                         if (copyup_object(peer, mn, pr) == NULL){
1344                                                 XSEGLOG2(&lc, E, "Error in copy up object");
1345                                                 mio->err = 1;
1346                                         } else {
1347                                                 mio->copyups++;
1348                                         }
1349                                 } else {
1350                                         ready++;
1351                                 }
1352                         }
1353                         can_wait = 1;
1354                 }
1355                 /*
1356 pending_copyups:
1357                 while(mio->copyups > 0){
1358                         mio->cb = copyup_cb;
1359                         wait_on_pr(pr, 0);
1360                         ta--;
1361                         st_cond_wait(pr->cond);
1362                 }
1363                 */
1364         }
1365
1366         if (mio->err){
1367                 r = -1;
1368                 XSEGLOG2(&lc, E, "Mio->err");
1369                 goto out;
1370         }
1371
1372         /* resize request to fit reply */
1373         char buf[XSEG_MAX_TARGETLEN];
1374         strncpy(buf, target, pr->req->targetlen);
1375         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1376         if (r < 0) {
1377                 XSEGLOG2(&lc, E, "Cannot resize request");
1378                 goto out;
1379         }
1380         target = xseg_get_target(peer->xseg, pr->req);
1381         strncpy(target, buf, pr->req->targetlen);
1382
1383         /* structure reply */
1384         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1385         reply->cnt = nr_objs;
1386         for (i = 0; i < (idx+1); i++) {
1387                 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1388                 reply->segs[i].targetlen = mns[i].mn->objectlen;
1389                 reply->segs[i].offset = mns[i].offset;
1390                 reply->segs[i].size = mns[i].size;
1391         }
1392 out:
1393         for (i = 0; i < idx; i++) {
1394                 put_mapnode(mns[i].mn);
1395         }
1396         free(mns);
1397         return r;
1398 }
1399
1400 static int do_dropcache(struct peer_req *pr, struct map *map)
1401 {
1402         struct map_node *mn;
1403         struct peerd *peer = pr->peer;
1404         struct mapperd *mapper = __get_mapperd(peer);
1405         uint64_t i;
1406         XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1407         map->flags |= MF_MAP_DROPPING_CACHE;
1408         for (i = 0; i < calc_map_obj(map); i++) {
1409                 mn = get_mapnode(map, i);
1410                 if (mn) {
1411                         if (!(mn->flags & MF_OBJECT_DESTROYED)){
1412                                 //make sure all pending operations on all objects are completed
1413                                 if (mn->flags & MF_OBJECT_NOT_READY){
1414                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1415                                 }
1416                                 mn->flags &= MF_OBJECT_DESTROYED;
1417                         }
1418                         put_mapnode(mn);
1419                 }
1420         }
1421         map->flags &= ~MF_MAP_DROPPING_CACHE;
1422         map->flags |= MF_MAP_DESTROYED;
1423         remove_map(mapper, map);
1424         XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1425         put_map(map);   // put map here to destroy it (matches m->ref = 1 on map create)
1426         return 0;
1427 }
1428
1429 static int do_info(struct peer_req *pr, struct map *map)
1430 {
1431         struct peerd *peer = pr->peer;
1432         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1433         xinfo->size = map->size;
1434         return 0;
1435 }
1436
1437
1438 static int do_close(struct peer_req *pr, struct map *map)
1439 {
1440 //      struct peerd *peer = pr->peer;
1441 //      struct xseg_request *req;
1442         if (map->flags & MF_MAP_EXCLUSIVE) 
1443                 close_map(pr, map);
1444         return do_dropcache(pr, map);
1445 }
1446
1447 static int do_destroy(struct peer_req *pr, struct map *map)
1448 {
1449         uint64_t i;
1450         struct peerd *peer = pr->peer;
1451         struct mapper_io *mio = __get_mapper_io(pr);
1452         struct map_node *mn;
1453         struct xseg_request *req;
1454         
1455         XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1456         map->flags |= MF_MAP_DELETING;
1457         req = delete_map(pr, map);
1458         if (!req)
1459                 return -1;
1460         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1461         if (req->state & XS_FAILED){
1462                 xseg_put_request(peer->xseg, req, pr->portno);
1463                 map->flags &= ~MF_MAP_DELETING;
1464                 return -1;
1465         }
1466         xseg_put_request(peer->xseg, req, pr->portno);
1467         //FIXME
1468         uint64_t nr_obj = calc_map_obj(map);
1469         uint64_t deleted = 0;
1470         while (deleted < nr_obj){ 
1471                 deleted = 0;
1472                 for (i = 0; i < nr_obj; i++){
1473                         mn = get_mapnode(map, i);
1474                         if (mn) {
1475                                 if (!(mn->flags & MF_OBJECT_DESTROYED)){
1476                                         if (mn->flags & MF_OBJECT_EXIST){
1477                                                 //make sure all pending operations on all objects are completed
1478                                                 if (mn->flags & MF_OBJECT_NOT_READY){
1479                                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1480                                                 }
1481                                                 req = delete_object(pr, mn);
1482                                                 if (!req)
1483                                                         if (mio->del_pending){
1484                                                                 goto wait_pending;
1485                                                         } else {
1486                                                                 continue;
1487                                                         }
1488                                                 else {
1489                                                         mio->del_pending++;
1490                                                 }
1491                                         }
1492                                         mn->flags &= MF_OBJECT_DESTROYED;
1493                                 }
1494                                 put_mapnode(mn);
1495                         }
1496                         deleted++;
1497                 }
1498 wait_pending:
1499                 mio->cb = deletion_cb;
1500                 wait_on_pr(pr, mio->del_pending > 0);
1501         }
1502         mio->cb = NULL;
1503         map->flags &= ~MF_MAP_DELETING;
1504         XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1505         return do_close(pr, map);
1506 }
1507
1508 static int do_mapr(struct peer_req *pr, struct map *map)
1509 {
1510         struct peerd *peer = pr->peer;
1511         int r = req2objs(pr, map, 0);
1512         if  (r < 0){
1513                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1514                                 map->volume, 
1515                                 (unsigned long long) pr->req->offset, 
1516                                 (unsigned long long) (pr->req->offset + pr->req->size));
1517                 return -1;
1518         }
1519         XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1520                         map->volume, 
1521                         (unsigned long long) pr->req->offset, 
1522                         (unsigned long long) (pr->req->offset + pr->req->size));
1523         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1524                         (unsigned long long) pr->req->offset,
1525                         (unsigned long long) pr->req->size);
1526         char buf[XSEG_MAX_TARGETLEN+1];
1527         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1528         int i;
1529         for (i = 0; i < reply->cnt; i++) {
1530                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1531                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1532                 buf[reply->segs[i].targetlen] = 0;
1533                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1534                                 (unsigned long long) reply->segs[i].offset,
1535                                 (unsigned long long) reply->segs[i].size);
1536         }
1537         return 0;
1538 }
1539
1540 static int do_mapw(struct peer_req *pr, struct map *map)
1541 {
1542         struct peerd *peer = pr->peer;
1543         int r = req2objs(pr, map, 1);
1544         if  (r < 0){
1545                 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1546                                 map->volume, 
1547                                 (unsigned long long) pr->req->offset, 
1548                                 (unsigned long long) (pr->req->offset + pr->req->size));
1549                 return -1;
1550         }
1551         XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1552                         map->volume, 
1553                         (unsigned long long) pr->req->offset, 
1554                         (unsigned long long) (pr->req->offset + pr->req->size));
1555         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1556                         (unsigned long long) pr->req->offset,
1557                         (unsigned long long) pr->req->size);
1558         char buf[XSEG_MAX_TARGETLEN+1];
1559         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1560         int i;
1561         for (i = 0; i < reply->cnt; i++) {
1562                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1563                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1564                 buf[reply->segs[i].targetlen] = 0;
1565                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1566                                 (unsigned long long) reply->segs[i].offset,
1567                                 (unsigned long long) reply->segs[i].size);
1568         }
1569         return 0;
1570 }
1571
1572 //here map is the parent map
1573 static int do_clone(struct peer_req *pr, struct map *map)
1574 {
1575         /*
1576         FIXME check if clone map exists
1577         clonemap = get_map(pr, target, targetlen, MF_LOAD);
1578         if (clonemap)
1579                 do_dropcache(pr, clonemap); // drop map here, rely on get_map_function to drop
1580                                         //      cache on non-exclusive opens or declare a NO_CACHE flag ?
1581                 return -1;
1582         */
1583
1584         int r;
1585         char buf[XSEG_MAX_TARGETLEN];
1586         struct peerd *peer = pr->peer;
1587         struct mapperd *mapper = __get_mapperd(peer);
1588         char *target = xseg_get_target(peer->xseg, pr->req);
1589         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1590         XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1591         struct map *clonemap = create_map(mapper, target, pr->req->targetlen);
1592         if (!clonemap) 
1593                 return -1;
1594
1595         if (xclone->size == -1)
1596                 clonemap->size = map->size;
1597         else
1598                 clonemap->size = xclone->size;
1599         if (clonemap->size < map->size){
1600                 target = xseg_get_target(peer->xseg, pr->req);
1601                 strncpy(buf, target, pr->req->targetlen);
1602                 buf[pr->req->targetlen] = 0;
1603                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1604                                 "\n\t for requested clone %s",
1605                                 (unsigned long long) xclone->size,
1606                                 (unsigned long long) map->size, buf);
1607                 goto out_err;
1608         }
1609         
1610         //alloc and init map_nodes
1611         unsigned long c = clonemap->size/block_size + 1;
1612         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1613         if (!map_nodes){
1614                 goto out_err;
1615         }
1616         int i;
1617         for (i = 0; i < clonemap->size/block_size + 1; i++) {
1618                 struct map_node *mn = get_mapnode(map, i);
1619                 if (mn) {
1620                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1621                         map_nodes[i].objectlen = mn->objectlen;
1622                         put_mapnode(mn);
1623                 } else {
1624                         strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
1625                         map_nodes[i].objectlen = strlen(zero_block);
1626                 }
1627                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1628                 map_nodes[i].flags = 0;
1629                 map_nodes[i].objectidx = i;
1630                 map_nodes[i].map = clonemap;
1631                 map_nodes[i].ref = 1;
1632                 map_nodes[i].waiters = 0;
1633                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1634                 r = insert_object(clonemap, &map_nodes[i]);
1635                 if (r < 0){
1636                         XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1637                         goto out_err;
1638                 }
1639         }
1640         r = write_map(pr, clonemap);
1641         if (r < 0){
1642                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1643                 goto out_err;
1644         }
1645         return 0;
1646
1647 out_err:
1648         put_map(clonemap);
1649         return -1;
1650 }
1651
1652 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1653 {
1654         int r, opened = 0;
1655 retry_open:
1656         if (flags & MF_EXCLUSIVE){
1657                 r = open_map(pr, map);
1658                 if (r < 0) {
1659                         if (flags & MF_FORCE){
1660
1661                                 goto retry_open;
1662                         }
1663                 } else {
1664                         opened = 1;
1665                 }
1666         }
1667         r = load_map(pr, map);
1668         if (r < 0 && opened){
1669                 close_map(pr, map);
1670         }
1671         return r;
1672 }
1673
1674 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1675 {
1676         int r;
1677         struct peerd *peer = pr->peer;
1678         struct mapperd *mapper = __get_mapperd(peer);
1679         struct map *map = find_map(mapper, name, namelen);
1680         if (!map){
1681                 if (flags & MF_LOAD){
1682                         map = create_map(mapper, name, namelen);
1683                         if (!map)
1684                                 return NULL;
1685                         r = open_load_map(pr, map, flags);
1686                         if (r < 0){
1687                                 do_dropcache(pr, map);
1688                                 return NULL;
1689                         }
1690                 } else {
1691                         return NULL;
1692                 }
1693         } else if (map->flags & MF_MAP_DESTROYED){
1694                 return NULL;
1695         }
1696         __get_map(map);
1697         return map;
1698
1699 }
1700
1701 static int map_action(int (action)(struct peer_req *pr, struct map *map),
1702                 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1703 {
1704         //struct peerd *peer = pr->peer;
1705         struct map *map;
1706 start:
1707         map = get_map(pr, name, namelen, flags);
1708         if (!map)
1709                 return -1;
1710         if (map->flags & MF_MAP_NOT_READY){
1711                 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1712                 put_map(map);
1713                 goto start;
1714         }
1715         int r = action(pr, map);
1716         //always drop cache if map not read exclusively
1717         if (!(map->flags & MF_MAP_EXCLUSIVE))
1718                 do_dropcache(pr, map);
1719         //maybe capture ref before and compare here?
1720         if (map->ref > 1){
1721                 signal_map(map);
1722         }
1723         put_map(map);
1724         return r;
1725 }
1726
1727 void * handle_info(struct peer_req *pr)
1728 {
1729         struct peerd *peer = pr->peer;
1730         char *target = xseg_get_target(peer->xseg, pr->req);
1731         int r = map_action(do_info, pr, target, pr->req->targetlen, MF_LOAD);
1732         if (r < 0)
1733                 fail(peer, pr);
1734         else
1735                 complete(peer, pr);
1736         ta--;
1737         return NULL;
1738 }
1739
1740 void * handle_clone(struct peer_req *pr)
1741 {
1742         int r;
1743         struct peerd *peer = pr->peer;
1744         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1745         if (!xclone) {
1746                 r = -1;
1747                 goto out;
1748         }
1749         if (xclone->targetlen){
1750                 r = map_action(do_clone, pr, xclone->target, xclone->targetlen, MF_LOAD);
1751         } else {
1752                 if (!xclone->size){
1753                         r = -1;
1754                 } else {
1755                         struct map *map;
1756                         char *target = xseg_get_target(peer->xseg, pr->req);
1757                         XSEGLOG2(&lc, I, "Creating volume");
1758                         map = get_map(pr, target, pr->req->targetlen, MF_LOAD);
1759                         if (map){
1760                                 XSEGLOG2(&lc, E, "Volume %s exists", map->volume);
1761                                 if (map->ref <= 2) //initial one + one ref from __get_map
1762                                         do_dropcache(pr, map); //we are the only ones usining this map. Drop the cache. 
1763                                 put_map(map); //matches get_map
1764                                 r = -1;
1765                                 goto out;
1766                         }
1767                         //create a new empty map of size
1768                         map = create_map(mapper, target, pr->req->targetlen);
1769                         if (!map){
1770                                 r = -1;
1771                                 goto out;
1772                         }
1773                         map->size = xclone->size;
1774                         //populate_map with zero objects;
1775                         uint64_t nr_objs = xclone->size / block_size;
1776                         if (xclone->size % block_size)
1777                                 nr_objs++;
1778                                 
1779                         struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1780                         if (!map_nodes){
1781                                 do_dropcache(pr, map); //Since we just created the map, dropping cache should be sufficient.
1782                                 r = -1;
1783                                 goto out;
1784                         }
1785                         uint64_t i;
1786                         for (i = 0; i < nr_objs; i++) {
1787                                 strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
1788                                 map_nodes[i].objectlen = strlen(zero_block);
1789                                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1790                                 map_nodes[i].flags = 0;
1791                                 map_nodes[i].objectidx = i;
1792                                 map_nodes[i].map = map;
1793                                 map_nodes[i].ref = 1;
1794                                 map_nodes[i].waiters = 0;
1795                                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1796                                 r = insert_object(map, &map_nodes[i]);
1797                                 if (r < 0){
1798                                         do_dropcache(pr, map);
1799                                         r = -1;
1800                                         goto out;
1801                                 }
1802                         }
1803                         r = write_map(pr, map);
1804                         if (r < 0){
1805                                 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1806                                 do_dropcache(pr, map);
1807                                 goto out;
1808                         }
1809                         XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1810                         r = 0;
1811                         do_dropcache(pr, map); //drop cache here for consistency
1812                 }
1813         }
1814 out:
1815         if (r < 0)
1816                 fail(peer, pr);
1817         else
1818                 complete(peer, pr);
1819         ta--;
1820         return NULL;
1821 }
1822
1823 void * handle_mapr(struct peer_req *pr)
1824 {
1825         struct peerd *peer = pr->peer;
1826         char *target = xseg_get_target(peer->xseg, pr->req);
1827         int r = map_action(do_mapr, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE);
1828         if (r < 0)
1829                 fail(peer, pr);
1830         else
1831                 complete(peer, pr);
1832         ta--;
1833         return NULL;
1834 }
1835
1836 void * handle_mapw(struct peer_req *pr)
1837 {
1838         struct peerd *peer = pr->peer;
1839         char *target = xseg_get_target(peer->xseg, pr->req);
1840         int r = map_action(do_mapw, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1841         if (r < 0)
1842                 fail(peer, pr);
1843         else
1844                 complete(peer, pr);
1845         XSEGLOG2(&lc, D, "Ta: %d", ta);
1846         ta--;
1847         return NULL;
1848 }
1849
1850 void * handle_destroy(struct peer_req *pr)
1851 {
1852         struct peerd *peer = pr->peer;
1853         char *target = xseg_get_target(peer->xseg, pr->req);
1854         int r = map_action(do_destroy, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1855         if (r < 0)
1856                 fail(peer, pr);
1857         else
1858                 complete(peer, pr);
1859         ta--;
1860         return NULL;
1861 }
1862
1863 void * handle_close(struct peer_req *pr)
1864 {
1865         struct peerd *peer = pr->peer;
1866         char *target = xseg_get_target(peer->xseg, pr->req);
1867         //here we do not want to load
1868         int r = map_action(do_close, pr, target, pr->req->targetlen, MF_EXCLUSIVE|MF_FORCE);
1869         if (r < 0)
1870                 fail(peer, pr);
1871         else
1872                 complete(peer, pr);
1873         ta--;
1874         return NULL;
1875 }
1876
1877 int dispatch_accepted(struct peerd *peer, struct peer_req *pr, 
1878                         struct xseg_request *req)
1879 {
1880         //struct mapperd *mapper = __get_mapperd(peer);
1881         struct mapper_io *mio = __get_mapper_io(pr);
1882         void *(*action)(struct peer_req *) = NULL;
1883
1884         mio->state = ACCEPTED;
1885         mio->err = 0;
1886         mio->cb = NULL;
1887         switch (pr->req->op) {
1888                 /* primary xseg operations of mapper */
1889                 case X_CLONE: action = handle_clone; break;
1890                 case X_MAPR: action = handle_mapr; break;
1891                 case X_MAPW: action = handle_mapw; break;
1892 //              case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1893                 case X_INFO: action = handle_info; break;
1894                 case X_DELETE: action = handle_destroy; break;
1895                 case X_CLOSE: action = handle_close; break;
1896                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1897         }
1898         if (action){
1899                 ta++;
1900                 mio->active = 1;
1901                 st_thread_create(action, pr, 0, 0);
1902         }
1903         return 0;
1904
1905 }
1906
1907 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1908                 enum dispatch_reason reason)
1909 {
1910         struct mapperd *mapper = __get_mapperd(peer);
1911         (void) mapper;
1912         struct mapper_io *mio = __get_mapper_io(pr);
1913         (void) mio;
1914
1915
1916         if (reason == dispatch_accept)
1917                 dispatch_accepted(peer, pr, req);
1918         else {
1919                 if (mio->cb){
1920                         mio->cb(pr, req);
1921                 } else { 
1922                         signal_pr(pr);
1923                 }
1924         }
1925         return 0;
1926 }
1927
1928 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1929 {
1930         int i;
1931         unsigned char buf[SHA256_DIGEST_SIZE];
1932         unsigned char *zero;
1933
1934         gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1935
1936         /* Version check should be the very first call because it
1937           makes sure that important subsystems are intialized. */
1938         gcry_check_version (NULL);
1939      
1940         /* Disable secure memory.  */
1941         gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1942      
1943         /* Tell Libgcrypt that initialization has completed. */
1944         gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1945
1946         /* calculate out magic sha hash value */
1947         gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1948
1949         /* calculate zero block */
1950         //FIXME check hash value
1951         zero = malloc(block_size);
1952         memset(zero, 0, block_size);
1953         gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1954         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1955                 sprintf(zero_block + 2*i, "%02x", buf[i]);
1956         printf("%s \n", zero_block);
1957         free(zero);
1958
1959         //FIXME error checks
1960         struct mapperd *mapperd = malloc(sizeof(struct mapperd));
1961         peer->priv = mapperd;
1962         mapper = mapperd;
1963         mapper->hashmaps = xhash_new(3, STRING);
1964         
1965         for (i = 0; i < peer->nr_ops; i++) {
1966                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1967                 mio->copyups_nodes = xhash_new(3, INTEGER);
1968                 mio->copyups = 0;
1969                 mio->err = 0;
1970                 mio->active = 0;
1971                 peer->peer_reqs[i].priv = mio;
1972         }
1973
1974         for (i = 0; i < argc; i++) {
1975                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1976                         mapper->bportno = atoi(argv[i+1]);
1977                         i += 1;
1978                         continue;
1979                 }
1980                 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1981                         mapper->mbportno = atoi(argv[i+1]);
1982                         i += 1;
1983                         continue;
1984                 }
1985                 /* enforce only one thread */
1986                 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1987                         int t = atoi(argv[i+1]);
1988                         if (t != 1) {
1989                                 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1990                                 return -1;
1991                         }
1992                         i += 1;
1993                         continue;
1994                 }
1995         }
1996
1997         const struct sched_param param = { .sched_priority = 99 };
1998         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
1999
2000
2001 //      test_map(peer);
2002
2003         return 0;
2004 }
2005
2006 void print_obj(struct map_node *mn)
2007 {
2008         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2009                         (unsigned long long) mn->objectidx, mn->object, 
2010                         (unsigned int) mn->objectlen, 
2011                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2012 }
2013
2014 void print_map(struct map *m)
2015 {
2016         uint64_t nr_objs = m->size/block_size;
2017         if (m->size % block_size)
2018                 nr_objs++;
2019         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n", 
2020                         m->volume, m->volumelen, 
2021                         (unsigned long long) m->size, 
2022                         (unsigned long long) nr_objs);
2023         uint64_t i;
2024         struct map_node *mn;
2025         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2026                 return;
2027         for (i = 0; i < nr_objs; i++) {
2028                 mn = find_object(m, i);
2029                 if (!mn){
2030                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
2031                         continue;
2032                 }
2033                 print_obj(mn);
2034         }
2035 }
2036
2037 /*
2038 void test_map(struct peerd *peer)
2039 {
2040         int i,j, ret;
2041         //struct sha256_ctx sha256ctx;
2042         unsigned char buf[SHA256_DIGEST_SIZE];
2043         char buf_new[XSEG_MAX_TARGETLEN + 20];
2044         struct map *m = malloc(sizeof(struct map));
2045         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2046         m->volume[XSEG_MAX_TARGETLEN] = 0;
2047         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2048         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2049         m->volumelen = XSEG_MAX_TARGETLEN;
2050         m->size = 100*block_size;
2051         m->objects = xhash_new(3, INTEGER);
2052         struct map_node *map_node = calloc(100, sizeof(struct map_node));
2053         for (i = 0; i < 100; i++) {
2054                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2055                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2056                 
2057                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2058                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2059                 }
2060                 map_node[i].objectidx = i;
2061                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2062                 map_node[i].flags = MF_OBJECT_EXIST;
2063                 ret = insert_object(m, &map_node[i]);
2064         }
2065
2066         char *data = malloc(block_size);
2067         mapheader_to_map(m, data);
2068         uint64_t pos = mapheader_size;
2069
2070         for (i = 0; i < 100; i++) {
2071                 map_node = find_object(m, i);
2072                 if (!map_node){
2073                         printf("no object node %d \n", i);
2074                         exit(1);
2075                 }
2076                 object_to_map(data+pos, map_node);
2077                 pos += objectsize_in_map;
2078         }
2079 //      print_map(m);
2080
2081         struct map *m2 = malloc(sizeof(struct map));
2082         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2083         m->volume[XSEG_MAX_TARGETLEN] = 0;
2084         m->volumelen = XSEG_MAX_TARGETLEN;
2085
2086         m2->objects = xhash_new(3, INTEGER);
2087         ret = read_map(peer, m2, data);
2088 //      print_map(m2);
2089
2090         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2091         ssize_t r, sum = 0;
2092         while (sum < block_size) {
2093                 r = write(fd, data + sum, block_size -sum);
2094                 if (r < 0){
2095                         perror("write");
2096                         printf("write error\n");
2097                         exit(1);
2098                 } 
2099                 sum += r;
2100         }
2101         close(fd);
2102         map_node = find_object(m, 0);
2103         free(map_node);
2104         free(m);
2105 }
2106 */