add improved argument parsing. also add helper messages
[archipelago] / xseg / peers / user / mt-mapperd.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <pthread.h>
5 #include <xseg/xseg.h>
6 #include <peer.h>
7 #include <time.h>
8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 //#include <gcrypt.h>
14 #include <errno.h>
15 #include <sched.h>
16 #include <sys/syscall.h>
17
18 //GCRY_THREAD_OPTION_PTHREAD_IMPL;
19 #define MF_LOAD         (1 << 0)
20 #define MF_EXCLUSIVE    (1 << 1)
21 #define MF_FORCE        (1 << 2)
22 #define MF_ARCHIP       (1 << 3)
23
24 #define SHA256_DIGEST_SIZE 32
25 /* hex representation of sha256 value takes up double the sha256 size */
26 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
27
28 #define MAPPER_PREFIX "archip_"
29 #define MAPPER_PREFIX_LEN 7
30
31 #define block_size (1<<22) //FIXME this should be defined here?
32
33 /* transparency byte + max object len */
34 #define objectsize_in_map (1 + XSEG_MAX_TARGETLEN)
35
36 /* volume size */
37 #define mapheader_size (sizeof(uint64_t))
38
39 /* reserve enough space to hold _%u object idx */
40 #define MAX_VOLUME_LEN (XSEG_MAX_TARGETLEN - 20)
41
42 #define MF_OBJECT_EXIST         (1 << 0)
43 #define MF_OBJECT_COPYING       (1 << 1)
44 #define MF_OBJECT_WRITING       (1 << 2)
45 #define MF_OBJECT_DELETING      (1 << 3)
46 #define MF_OBJECT_DELETED       (1 << 4)
47 #define MF_OBJECT_DESTROYED     (1 << 5)
48
49 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
50
51 //char *magic_string = "This a magic string. Please hash me";
52 //unsigned char magic_sha256[SHA256_DIGEST_SIZE];       /* sha256 hash value of magic string */
53 //char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
54
55 char *zero_block="zeroblock";
56 #define ZERO_BLOCK_LEN 9 /* strlen(zero_block) */
57
58 //dispatch_internal mapper states
59 enum mapper_state {
60         ACCEPTED = 0,
61         WRITING = 1,
62         COPYING = 2,
63         DELETING = 3,
64         DROPPING_CACHE = 4
65 };
66
67 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
68
69 struct map_node {
70         uint32_t flags;
71         uint32_t objectidx;
72         uint32_t objectlen;
73         char object[XSEG_MAX_TARGETLEN + 1];    /* NULL terminated string */
74         struct map *map;
75         uint32_t ref;
76         uint32_t waiters;
77         st_cond_t cond;
78 };
79
80 #define MF_MAP_LOADING          (1 << 0)
81 #define MF_MAP_DESTROYED        (1 << 1)
82 #define MF_MAP_WRITING          (1 << 2)
83 #define MF_MAP_DELETING         (1 << 3)
84 #define MF_MAP_DROPPING_CACHE   (1 << 4)
85 #define MF_MAP_EXCLUSIVE        (1 << 5)
86 #define MF_MAP_OPENING          (1 << 6)
87 #define MF_MAP_CLOSING          (1 << 7)
88
89 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
90                                         MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
91
92 #define wait_on_pr(__pr, __condition__)         \
93         while (__condition__){                  \
94                 ta--;                           \
95                 __get_mapper_io(pr)->active = 0;\
96                 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
97                 st_cond_wait(__pr->cond);       \
98         }
99
100 #define wait_on_mapnode(__mn, __condition__)    \
101         while (__condition__){                  \
102                 ta--;                           \
103                 __mn->waiters++;                \
104                 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
105                 st_cond_wait(__mn->cond);       \
106         }
107
108 #define wait_on_map(__map, __condition__)       \
109         while (__condition__){                  \
110                 ta--;                           \
111                 __map->waiters++;               \
112                 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
113                 st_cond_wait(__map->cond);      \
114         }
115
116 #define signal_pr(__pr)                         \
117         do {                                    \
118                 if (!__get_mapper_io(pr)->active){\
119                         ta++;                   \
120                         XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta); \
121                         __get_mapper_io(pr)->active = 1;\
122                         st_cond_signal(__pr->cond);     \
123                 }                               \
124         }while(0)
125
126 #define signal_map(__map)                       \
127         do {                                    \
128                 if (__map->waiters) {           \
129                         ta += 1;                \
130                         XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
131                         __map->waiters--;       \
132                         st_cond_signal(__map->cond);    \
133                 }                               \
134         }while(0)
135
136 #define signal_mapnode(__mn)                    \
137         do {                                    \
138                 if (__mn->waiters) {            \
139                         ta += __mn->waiters;    \
140                         XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
141                         __mn->waiters = 0;      \
142                         st_cond_broadcast(__mn->cond);  \
143                 }                               \
144         }while(0)
145
146
147 struct map {
148         uint32_t flags;
149         uint64_t size;
150         uint32_t volumelen;
151         char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
152         xhash_t *objects;       /* obj_index --> map_node */
153         uint32_t ref;
154         uint32_t waiters;
155         st_cond_t cond;
156 };
157
158 struct mapperd {
159         xport bportno;          /* blocker that accesses data */
160         xport mbportno;         /* blocker that accesses maps */
161         xhash_t *hashmaps; // hash_function(target) --> struct map
162 };
163
164 struct mapper_io {
165         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
166         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
167         struct map_node *copyup_node;
168         volatile int err;                       /* error flag */
169         volatile uint64_t del_pending;
170         uint64_t delobj;
171         uint64_t dcobj;
172         cb_t cb;
173         enum mapper_state state;
174         volatile int active;
175 };
176
177 /* global vars */
178 struct mapperd *mapper;
179
180 void print_map(struct map *m);
181
182
183 void custom_peer_usage()
184 {
185         fprintf(stderr, "Custom peer options: \n"
186                         "-bp  : port for block blocker(!)\n"
187                         "-mbp : port for map blocker\n"
188                         "\n");
189 }
190
191
192 /*
193  * Helper functions
194  */
195
196 static inline struct mapperd * __get_mapperd(struct peerd *peer)
197 {
198         return (struct mapperd *) peer->priv;
199 }
200
201 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
202 {
203         return (struct mapper_io *) pr->priv;
204 }
205
206 static inline uint64_t calc_map_obj(struct map *map)
207 {
208         if (map->size == -1)
209                 return 0;
210         uint64_t nr_objs = map->size / block_size;
211         if (map->size % block_size)
212                 nr_objs++;
213         return nr_objs;
214 }
215
216 static uint32_t calc_nr_obj(struct xseg_request *req)
217 {
218         unsigned int r = 1;
219         uint64_t rem_size = req->size;
220         uint64_t obj_offset = req->offset & (block_size -1); //modulo
221         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
222         rem_size -= obj_size;
223         while (rem_size > 0) {
224                 obj_size = (rem_size > block_size) ? block_size : rem_size;
225                 rem_size -= obj_size;
226                 r++;
227         }
228
229         return r;
230 }
231
232 /*
233  * Maps handling functions
234  */
235
236 static struct map * find_map(struct mapperd *mapper, char *volume)
237 {
238         struct map *m = NULL;
239         int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
240                                 (xhashidx *) &m);
241         if (r < 0)
242                 return NULL;
243         return m;
244 }
245
246 static struct map * find_map_len(struct mapperd *mapper, char *target,
247                                         uint32_t targetlen, uint32_t flags)
248 {
249         char buf[XSEG_MAX_TARGETLEN+1];
250         if (flags & MF_ARCHIP){
251                 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
252                 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
253                 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
254                 targetlen += MAPPER_PREFIX_LEN;
255         }
256         else {
257                 strncpy(buf, target, targetlen);
258                 buf[targetlen] = 0;
259         }
260
261         if (targetlen > MAX_VOLUME_LEN){
262                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
263                                         targetlen, MAX_VOLUME_LEN);
264                 return NULL;
265         }
266
267         XSEGLOG2(&lc, D, "looking up map %s, len %u",
268                         buf, targetlen);
269         return find_map(mapper, buf);
270 }
271
272
273 static int insert_map(struct mapperd *mapper, struct map *map)
274 {
275         int r = -1;
276
277         if (find_map(mapper, map->volume)){
278                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
279                 goto out;
280         }
281
282         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
283                         map->volume, strlen(map->volume), (unsigned long) map);
284         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
285         while (r == -XHASH_ERESIZE) {
286                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
287                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
288                 if (!new_hashmap){
289                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
290                                         (unsigned long long) shift);
291                         goto out;
292                 }
293                 mapper->hashmaps = new_hashmap;
294                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
295         }
296 out:
297         return r;
298 }
299
300 static int remove_map(struct mapperd *mapper, struct map *map)
301 {
302         int r = -1;
303
304         //assert no pending pr on map
305
306         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
307         while (r == -XHASH_ERESIZE) {
308                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
309                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
310                 if (!new_hashmap){
311                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
312                                         (unsigned long long) shift);
313                         goto out;
314                 }
315                 mapper->hashmaps = new_hashmap;
316                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
317         }
318 out:
319         return r;
320 }
321
322 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
323 {
324         int r;
325         xport p;
326         struct peerd *peer = pr->peer;
327         struct xseg_request *req;
328         struct mapperd *mapper = __get_mapperd(peer);
329         void *dummy;
330
331         XSEGLOG2(&lc, I, "Closing map %s", map->volume);
332
333         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
334         if (!req){
335                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
336                                 map->volume);
337                 goto out_err;
338         }
339
340         r = xseg_prep_request(peer->xseg, req, map->volumelen, block_size);
341         if (r < 0){
342                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
343                                 map->volume);
344                 goto out_put;
345         }
346
347         char *reqtarget = xseg_get_target(peer->xseg, req);
348         if (!reqtarget)
349                 goto out_put;
350         strncpy(reqtarget, map->volume, req->targetlen);
351         req->op = X_CLOSE;
352         req->size = block_size;
353         req->offset = 0;
354         r = xseg_set_req_data(peer->xseg, req, pr);
355         if (r < 0){
356                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
357                                 map->volume);
358                 goto out_put;
359         }
360         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
361         if (p == NoPort){
362                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
363                                 map->volume);
364                 goto out_unset;
365         }
366         r = xseg_signal(peer->xseg, p);
367
368         XSEGLOG2(&lc, I, "Map %s closing", map->volume);
369         return req;
370
371 out_unset:
372         xseg_get_req_data(peer->xseg, req, &dummy);
373 out_put:
374         xseg_put_request(peer->xseg, req, pr->portno);
375 out_err:
376         return NULL;
377 }
378
379 static int close_map(struct peer_req *pr, struct map *map)
380 {
381         int err;
382         struct xseg_request *req;
383         struct peerd *peer = pr->peer;
384
385         map->flags |= MF_MAP_CLOSING;
386         req = __close_map(pr, map);
387         if (!req)
388                 return -1;
389         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
390         map->flags &= ~MF_MAP_CLOSING;
391         err = req->state & XS_FAILED;
392         xseg_put_request(peer->xseg, req, pr->portno);
393         if (err)
394                 return -1;
395         return 0;
396 }
397
398 /*
399 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
400                                 char *target, uint32_t targetlen, struct map **m)
401 {
402         struct mapperd *mapper = __get_mapperd(peer);
403         int r;
404         *m = find_map(mapper, target, targetlen);
405         if (*m) {
406                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
407                 if ((*m)->flags & MF_MAP_NOT_READY) {
408                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
409                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
410                         return MF_PENDING;
411                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
412                 //      return -1;
413                 // 
414                 }else {
415                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
416                         return 0;
417                 }
418         }
419         r = open_map(peer, pr, target, targetlen, 0);
420         if (r < 0)
421                 return -1; //error
422         return MF_PENDING;      
423 }
424 */
425 /*
426  * Object handling functions
427  */
428
429 struct map_node *find_object(struct map *map, uint64_t obj_index)
430 {
431         struct map_node *mn;
432         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
433         if (r < 0)
434                 return NULL;
435         return mn;
436 }
437
438 static int insert_object(struct map *map, struct map_node *mn)
439 {
440         //FIXME no find object first
441         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
442         if (r == -XHASH_ERESIZE) {
443                 unsigned long shift = xhash_grow_size_shift(map->objects);
444                 map->objects = xhash_resize(map->objects, shift, NULL);
445                 if (!map->objects)
446                         return -1;
447                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
448         }
449         return r;
450 }
451
452
453 /*
454  * map read/write functions 
455  */
456 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
457 {
458         int i;
459         //hexlify sha256 value
460         for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
461                 sprintf(mn->object+2*i, "%02x", buf[i]);
462         }
463
464         mn->object[SHA256_DIGEST_SIZE * 2] = 0;
465         mn->objectlen = SHA256_DIGEST_SIZE * 2;
466         mn->flags = MF_OBJECT_EXIST;
467 }
468
469 static inline void map_to_object(struct map_node *mn, char *buf)
470 {
471         char c = buf[0];
472         mn->flags = 0;
473         if (c)
474                 mn->flags |= MF_OBJECT_EXIST;
475         memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
476         mn->object[XSEG_MAX_TARGETLEN] = 0;
477         mn->objectlen = strlen(mn->object);
478 }
479
480 static inline void object_to_map(char* buf, struct map_node *mn)
481 {
482         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
483         memcpy(buf+1, mn->object, mn->objectlen);
484         /* zero out the rest of the buffer */
485         memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen);
486 }
487
488 static inline void mapheader_to_map(struct map *m, char *buf)
489 {
490         uint64_t pos = 0;
491 //      memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
492 //      pos += SHA256_DIGEST_SIZE;
493         memcpy(buf + pos, &m->size, sizeof(m->size));
494         pos += sizeof(m->size);
495 }
496
497
498 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr, 
499                                 struct map *map, struct map_node *mn)
500 {
501         void *dummy;
502         struct mapperd *mapper = __get_mapperd(peer);
503         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
504                                                         mapper->mbportno, X_ALLOC);
505         if (!req){
506                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
507                                 "(Map: %s [%llu]",
508                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
509                 goto out_err;
510         }
511         int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
512         if (r < 0){
513                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
514                                 "(Map: %s [%llu]",
515                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
516                 goto out_put;
517         }
518         char *target = xseg_get_target(peer->xseg, req);
519         strncpy(target, map->volume, req->targetlen);
520         req->size = objectsize_in_map;
521         req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
522         req->op = X_WRITE;
523         char *data = xseg_get_data(peer->xseg, req);
524         object_to_map(data, mn);
525
526         r = xseg_set_req_data(peer->xseg, req, pr);
527         if (r < 0){
528                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
529                                 "(Map: %s [%llu]",
530                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
531                 goto out_put;
532         }
533         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
534         if (p == NoPort){
535                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
536                                 "(Map: %s [%llu]",
537                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
538                 goto out_unset;
539         }
540         r = xseg_signal(peer->xseg, p);
541         if (r < 0)
542                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
543
544         XSEGLOG2(&lc, I, "Writing object %s \n\t"
545                         "Map: %s [%llu]",
546                         mn->object, map->volume, (unsigned long long) mn->objectidx);
547
548         return req;
549
550 out_unset:
551         xseg_get_req_data(peer->xseg, req, &dummy);
552 out_put:
553         xseg_put_request(peer->xseg, req, pr->portno);
554 out_err:
555         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
556                         "(Map: %s [%llu]",
557                         mn->object, map->volume, (unsigned long long) mn->objectidx);
558         return NULL;
559 }
560
561 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
562 {
563         void *dummy;
564         struct peerd *peer = pr->peer;
565         struct mapperd *mapper = __get_mapperd(peer);
566         struct map_node *mn;
567         uint64_t i, pos, max_objidx = calc_map_obj(map);
568         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
569                                                         mapper->mbportno, X_ALLOC);
570         if (!req){
571                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
572                 goto out_err;
573         }
574         int r = xseg_prep_request(peer->xseg, req, map->volumelen,
575                         mapheader_size + max_objidx * objectsize_in_map);
576         if (r < 0){
577                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
578                 goto out_put;
579         }
580         char *target = xseg_get_target(peer->xseg, req);
581         strncpy(target, map->volume, req->targetlen);
582         char *data = xseg_get_data(peer->xseg, req);
583         mapheader_to_map(map, data);
584         pos = mapheader_size;
585         req->op = X_WRITE;
586         req->size = req->datalen;
587         req->offset = 0;
588
589         if (map->size % block_size)
590                 max_objidx++;
591         for (i = 0; i < max_objidx; i++) {
592                 mn = find_object(map, i);
593                 if (!mn){
594                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
595                                         (unsigned long long) i, map->volume);
596                         goto out_put;
597                 }
598                 object_to_map(data+pos, mn);
599                 pos += objectsize_in_map;
600         }
601         r = xseg_set_req_data(peer->xseg, req, pr);
602         if (r < 0){
603                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
604                                 map->volume);
605                 goto out_put;
606         }
607         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
608         if (p == NoPort){
609                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
610                                 map->volume);
611                 goto out_unset;
612         }
613         r = xseg_signal(peer->xseg, p);
614         if (r < 0)
615                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
616
617         map->flags |= MF_MAP_WRITING;
618         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
619         return req;
620
621 out_unset:
622         xseg_get_req_data(peer->xseg, req, &dummy);
623 out_put:
624         xseg_put_request(peer->xseg, req, pr->portno);
625 out_err:
626         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
627         return NULL;
628 }
629
630 static int write_map(struct peer_req* pr, struct map *map)
631 {
632         int r = 0;
633         struct peerd *peer = pr->peer;
634         map->flags |= MF_MAP_WRITING;
635         struct xseg_request *req = __write_map(pr, map);
636         if (!req)
637                 return -1;
638         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
639         if (req->state & XS_FAILED)
640                 r = -1;
641         xseg_put_request(peer->xseg, req, pr->portno);
642         map->flags &= ~MF_MAP_WRITING;
643         return r;
644 }
645
646 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
647 {
648         int r;
649         xport p;
650         struct xseg_request *req;
651         struct peerd *peer = pr->peer;
652         struct mapperd *mapper = __get_mapperd(peer);
653         void *dummy;
654
655         XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
656
657         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
658         if (!req){
659                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
660                                 m->volume);
661                 goto out_fail;
662         }
663
664         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
665         if (r < 0){
666                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
667                                 m->volume);
668                 goto out_put;
669         }
670
671         char *reqtarget = xseg_get_target(peer->xseg, req);
672         if (!reqtarget)
673                 goto out_put;
674         strncpy(reqtarget, m->volume, req->targetlen);
675         req->op = X_READ;
676         req->size = block_size;
677         req->offset = 0;
678         r = xseg_set_req_data(peer->xseg, req, pr);
679         if (r < 0){
680                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
681                                 m->volume);
682                 goto out_put;
683         }
684         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
685         if (p == NoPort){ 
686                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
687                                 m->volume);
688                 goto out_unset;
689         }
690         r = xseg_signal(peer->xseg, p);
691         
692         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
693         return req;
694
695 out_unset:
696         xseg_get_req_data(peer->xseg, req, &dummy);
697 out_put:
698         xseg_put_request(peer->xseg, req, pr->portno);
699 out_fail:
700         return NULL;
701 }
702
703 static int read_map (struct map *map, char *buf)
704 {
705         char nulls[SHA256_DIGEST_SIZE];
706         memset(nulls, 0, SHA256_DIGEST_SIZE);
707
708         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
709         if (r) {
710                 XSEGLOG2(&lc, D, "Read zeros");
711                 //read error;
712                 return -1;
713         }
714         //type 1, our type, type 0 pithos map
715         int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
716         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
717         uint64_t pos;
718         uint64_t i, nr_objs;
719         struct map_node *map_node;
720         if (type) {
721                 uint64_t pos = 0;
722                 map->size = *(uint64_t *) (buf + pos);
723                 pos += sizeof(uint64_t);
724                 nr_objs = map->size / block_size;
725                 if (map->size % block_size)
726                         nr_objs++;
727                 map_node = calloc(nr_objs, sizeof(struct map_node));
728                 if (!map_node)
729                         return -1;
730
731                 for (i = 0; i < nr_objs; i++) {
732                         map_node[i].map = map;
733                         map_node[i].objectidx = i;
734                         map_node[i].waiters = 0;
735                         map_node[i].ref = 1;
736                         map_node[i].cond = st_cond_new(); //FIXME err check;
737                         map_to_object(&map_node[i], buf + pos);
738                         pos += objectsize_in_map;
739                         r = insert_object(map, &map_node[i]); //FIXME error check
740                 }
741         } else {
742                 pos = 0;
743                 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
744                 map_node = calloc(max_nr_objs, sizeof(struct map_node));
745                 if (!map_node)
746                         return -1;
747                 for (i = 0; i < max_nr_objs; i++) {
748                         if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
749                                 break;
750                         map_node[i].objectidx = i;
751                         map_node[i].map = map;
752                         map_node[i].waiters = 0;
753                         map_node[i].ref = 1;
754                         map_node[i].cond = st_cond_new(); //FIXME err check;
755                         pithosmap_to_object(&map_node[i], buf + pos);
756                         pos += SHA256_DIGEST_SIZE;
757                         r = insert_object(map, &map_node[i]); //FIXME error check
758                 }
759                 map->size = i * block_size;
760         }
761         print_map(map);
762         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
763         return 0;
764
765         //FIXME cleanup on error
766 }
767
768 static int load_map(struct peer_req *pr, struct map *map)
769 {
770         int r = 0;
771         struct xseg_request *req;
772         struct peerd *peer = pr->peer;
773         map->flags |= MF_MAP_LOADING;
774         req = __load_map(pr, map);
775         if (!req)
776                 return -1;
777         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
778         map->flags &= ~MF_MAP_LOADING;
779         if (req->state & XS_FAILED){
780                 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
781                 xseg_put_request(peer->xseg, req, pr->portno);
782                 return -1;
783         }
784         r = read_map(map, xseg_get_data(peer->xseg, req));
785         xseg_put_request(peer->xseg, req, pr->portno);
786         return r;
787 }
788
789 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
790                                                 uint32_t flags)
791 {
792         int r;
793         xport p;
794         struct xseg_request *req;
795         struct peerd *peer = pr->peer;
796         struct mapperd *mapper = __get_mapperd(peer);
797         void *dummy;
798
799         XSEGLOG2(&lc, I, "Opening map %s", m->volume);
800
801         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
802         if (!req){
803                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
804                                 m->volume);
805                 goto out_fail;
806         }
807
808         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
809         if (r < 0){
810                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
811                                 m->volume);
812                 goto out_put;
813         }
814
815         char *reqtarget = xseg_get_target(peer->xseg, req);
816         if (!reqtarget)
817                 goto out_put;
818         strncpy(reqtarget, m->volume, req->targetlen);
819         req->op = X_OPEN;
820         req->size = block_size;
821         req->offset = 0;
822         if (!(flags & MF_FORCE))
823                 req->flags = XF_NOSYNC;
824         r = xseg_set_req_data(peer->xseg, req, pr);
825         if (r < 0){
826                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
827                                 m->volume);
828                 goto out_put;
829         }
830         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
831         if (p == NoPort){ 
832                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
833                                 m->volume);
834                 goto out_unset;
835         }
836         r = xseg_signal(peer->xseg, p);
837         
838         XSEGLOG2(&lc, I, "Map %s opening", m->volume);
839         return req;
840
841 out_unset:
842         xseg_get_req_data(peer->xseg, req, &dummy);
843 out_put:
844         xseg_put_request(peer->xseg, req, pr->portno);
845 out_fail:
846         return NULL;
847 }
848
849 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
850 {
851         int err;
852         struct xseg_request *req;
853         struct peerd *peer = pr->peer;
854
855         map->flags |= MF_MAP_OPENING;
856         req = __open_map(pr, map, flags);
857         if (!req)
858                 return -1;
859         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
860         map->flags &= ~MF_MAP_OPENING;
861         err = req->state & XS_FAILED;
862         xseg_put_request(peer->xseg, req, pr->portno);
863         if (err)
864                 return -1;
865         else 
866                 map->flags |= MF_MAP_EXCLUSIVE;
867         return 0;
868 }
869
870 /*
871  * copy up functions
872  */
873
874 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
875 {
876         int r = 0;
877         if (mn){
878                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
879                 if (r == -XHASH_ERESIZE) {
880                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
881                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
882                         if (!new_hashmap)
883                                 goto out;
884                         mio->copyups_nodes = new_hashmap;
885                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
886                 }
887         }
888         else {
889                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
890                 if (r == -XHASH_ERESIZE) {
891                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
892                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
893                         if (!new_hashmap)
894                                 goto out;
895                         mio->copyups_nodes = new_hashmap;
896                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
897                 }
898         }
899 out:
900         return r;
901 }
902
903 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
904 {
905         struct map_node *mn;
906         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
907         if (r < 0)
908                 return NULL;
909         return mn;
910 }
911
912 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
913 {
914         struct mapperd *mapper = __get_mapperd(peer);
915         struct mapper_io *mio = __get_mapper_io(pr);
916         struct map *map = mn->map;
917         void *dummy;
918         int r = -1;
919         xport p;
920
921         uint32_t newtargetlen;
922         char new_target[XSEG_MAX_TARGETLEN + 1];
923         strncpy(new_target, map->volume, map->volumelen);
924         sprintf(new_target + map->volumelen, "_%u", mn->objectidx);
925         new_target[XSEG_MAX_TARGETLEN] = 0;
926         newtargetlen = strlen(new_target);
927
928         if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
929                 goto copyup_zeroblock;
930
931         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
932                                                 mapper->bportno, X_ALLOC);
933         if (!req){
934                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
935                 goto out_err;
936         }
937         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
938                                 sizeof(struct xseg_request_copy));
939         if (r < 0){
940                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
941                 goto out_put;
942         }
943
944         char *target = xseg_get_target(peer->xseg, req);
945         strncpy(target, new_target, req->targetlen);
946
947         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
948         strncpy(xcopy->target, mn->object, mn->objectlen);
949         xcopy->targetlen = mn->objectlen;
950
951         req->offset = 0;
952         req->size = block_size;
953         req->op = X_COPY;
954         r = xseg_set_req_data(peer->xseg, req, pr);
955         if (r<0){
956                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
957                 goto out_put;
958         }
959         r = __set_copyup_node(mio, req, mn);
960         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
961         if (p == NoPort) {
962                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
963                 goto out_unset;
964         }
965         xseg_signal(peer->xseg, p);
966 //      mio->copyups++;
967
968         mn->flags |= MF_OBJECT_COPYING;
969         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
970         return req;
971
972 out_unset:
973         r = __set_copyup_node(mio, req, NULL);
974         xseg_get_req_data(peer->xseg, req, &dummy);
975 out_put:
976         xseg_put_request(peer->xseg, req, pr->portno);
977 out_err:
978         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
979         return NULL;
980
981 copyup_zeroblock:
982         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
983                         "Proceeding in writing the new object in map");
984         /* construct a tmp map_node for writing purposes */
985         struct map_node newmn = *mn;
986         newmn.flags = MF_OBJECT_EXIST;
987         strncpy(newmn.object, new_target, newtargetlen);
988         newmn.object[newtargetlen] = 0;
989         newmn.objectlen = newtargetlen;
990         newmn.objectidx = mn->objectidx; 
991         req = object_write(peer, pr, map, &newmn);
992         r = __set_copyup_node(mio, req, mn);
993         if (!req){
994                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
995                                 "\n\t of map %s [%llu]",
996                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
997                 return NULL;
998         }
999         mn->flags |= MF_OBJECT_WRITING;
1000         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1001         return req;
1002 }
1003
1004 static struct xseg_request * delete_object(struct peer_req *pr, struct map_node *mn)
1005 {
1006         void *dummy;
1007         struct peerd *peer = pr->peer;
1008         struct mapperd *mapper = __get_mapperd(peer);
1009         struct mapper_io *mio = __get_mapper_io(pr);
1010         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1011                                                         mapper->bportno, X_ALLOC);
1012         XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1013         if (!req){
1014                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1015                 goto out_err;
1016         }
1017         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1018         if (r < 0){
1019                 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1020                 goto out_put;
1021         }
1022         char *target = xseg_get_target(peer->xseg, req);
1023         strncpy(target, mn->object, req->targetlen);
1024         req->op = X_DELETE;
1025         req->size = req->datalen;
1026         req->offset = 0;
1027         r = xseg_set_req_data(peer->xseg, req, pr);
1028         if (r < 0){
1029                 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1030                 goto out_put;
1031         }
1032         __set_copyup_node(mio, req, mn);
1033         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1034         if (p == NoPort){
1035                 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1036                 goto out_unset;
1037         }
1038         r = xseg_signal(peer->xseg, p);
1039         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1040         return req;
1041
1042 out_unset:
1043         xseg_get_req_data(peer->xseg, req, &dummy);
1044 out_put:
1045         xseg_put_request(peer->xseg, req, pr->portno);
1046 out_err:
1047         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1048         return NULL;
1049 }
1050
1051 static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
1052 {
1053         void *dummy;
1054         struct peerd *peer = pr->peer;
1055         struct mapperd *mapper = __get_mapperd(peer);
1056         struct mapper_io *mio = __get_mapper_io(pr);
1057         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1058                                                         mapper->mbportno, X_ALLOC);
1059         XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1060         if (!req){
1061                 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1062                 goto out_err;
1063         }
1064         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1065         if (r < 0){
1066                 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1067                 goto out_put;
1068         }
1069         char *target = xseg_get_target(peer->xseg, req);
1070         strncpy(target, map->volume, req->targetlen);
1071         req->op = X_DELETE;
1072         req->size = req->datalen;
1073         req->offset = 0;
1074         r = xseg_set_req_data(peer->xseg, req, pr);
1075         if (r < 0){
1076                 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1077                 goto out_put;
1078         }
1079         __set_copyup_node(mio, req, NULL);
1080         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1081         if (p == NoPort){
1082                 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1083                 goto out_unset;
1084         }
1085         r = xseg_signal(peer->xseg, p);
1086         map->flags |= MF_MAP_DELETING;
1087         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1088         return req;
1089
1090 out_unset:
1091         xseg_get_req_data(peer->xseg, req, &dummy);
1092 out_put:
1093         xseg_put_request(peer->xseg, req, pr->portno);
1094 out_err:
1095         XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1096         return  NULL;
1097 }
1098
1099
1100 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1101 {
1102         struct map_node *mn = find_object(map, index);
1103         if (mn)
1104                 mn->ref++;
1105         return mn;
1106 }
1107
1108 static inline void put_mapnode(struct map_node *mn)
1109 {
1110         mn->ref--;
1111         if (!mn->ref){
1112                 //clean up mn
1113                 st_cond_destroy(mn->cond);
1114         }
1115 }
1116
1117 static inline void __get_map(struct map *map)
1118 {
1119         map->ref++;
1120 }
1121
1122 static inline void put_map(struct map *map)
1123 {
1124         struct map_node *mn;
1125         map->ref--;
1126         if (!map->ref){
1127                 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1128                 //clean up map
1129                 uint64_t i;
1130                 for (i = 0; i < calc_map_obj(map); i++) {
1131                         mn = get_mapnode(map, i);
1132                         if (mn) {
1133                                 //make sure all pending operations on all objects are completed
1134                                 if (mn->flags & MF_OBJECT_NOT_READY){
1135                                         //this should never happen...
1136                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1137                                 }
1138                                 mn->flags &= MF_OBJECT_DESTROYED;
1139                                 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1140                                 put_mapnode(mn); //matcing get_mapnode;
1141                                 //assert mn->ref == 0;
1142                         }
1143                 }
1144                 mn = find_object(map, 0);
1145                 if (mn)
1146                         free(mn);
1147                 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1148                 free(map);
1149         }
1150 }
1151
1152 static struct map * create_map(struct mapperd *mapper, char *name,
1153                                 uint32_t namelen, uint32_t flags)
1154 {
1155         int r;
1156         if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1157                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1158                                         namelen, MAX_VOLUME_LEN);
1159                 return NULL;
1160         }
1161         struct map *m = malloc(sizeof(struct map));
1162         if (!m){
1163                 XSEGLOG2(&lc, E, "Cannot allocate map ");
1164                 goto out_err;
1165         }
1166         m->size = -1;
1167         if (flags & MF_ARCHIP){
1168                 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1169                 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1170                 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1171                 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1172         }
1173         else {
1174                 strncpy(m->volume, name, namelen);
1175                 m->volume[namelen] = 0;
1176                 m->volumelen = namelen;
1177         }
1178         m->flags = 0;
1179         m->objects = xhash_new(3, INTEGER); 
1180         if (!m->objects){
1181                 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1182                                 m->volume);
1183                 goto out_map;
1184         }
1185         m->ref = 1;
1186         m->waiters = 0;
1187         m->cond = st_cond_new(); //FIXME err check;
1188         r = insert_map(mapper, m);
1189         if (r < 0){
1190                 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1191                 goto out_hash;
1192         }
1193
1194         return m;
1195
1196 out_hash:
1197         xhash_free(m->objects);
1198 out_map:
1199         XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1200         free(m);
1201 out_err:
1202         return NULL;
1203 }
1204
1205
1206
1207 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1208 {
1209         struct peerd *peer = pr->peer;
1210         struct mapperd *mapper = __get_mapperd(peer);
1211         (void)mapper;
1212         struct mapper_io *mio = __get_mapper_io(pr);
1213         struct map_node *mn = __get_copyup_node(mio, req);
1214
1215         mio->del_pending--;
1216         if (req->state & XS_FAILED){
1217                 mio->err = 1;
1218         }
1219         signal_mapnode(mn);
1220         xseg_put_request(peer->xseg, req, pr->portno);
1221         signal_pr(pr);
1222 }
1223
1224 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1225 {
1226         struct peerd *peer = pr->peer;
1227         struct mapperd *mapper = __get_mapperd(peer);
1228         (void)mapper;
1229         struct mapper_io *mio = __get_mapper_io(pr);
1230         struct map_node *mn = __get_copyup_node(mio, req);
1231         if (!mn){
1232                 XSEGLOG2(&lc, E, "Cannot get map node");
1233                 goto out_err;
1234         }
1235         __set_copyup_node(mio, req, NULL);
1236
1237         if (req->state & XS_FAILED){
1238                 XSEGLOG2(&lc, E, "Req failed");
1239                 mn->flags &= ~MF_OBJECT_COPYING;
1240                 mn->flags &= ~MF_OBJECT_WRITING;
1241                 goto out_err;
1242         }
1243         if (req->op == X_WRITE) {
1244                 char *target = xseg_get_target(peer->xseg, req);
1245                 (void)target;
1246                 //printf("handle object write replyi\n");
1247                 __set_copyup_node(mio, req, NULL);
1248                 //assert mn->flags & MF_OBJECT_WRITING
1249                 mn->flags &= ~MF_OBJECT_WRITING;
1250
1251                 struct map_node tmp;
1252                 char *data = xseg_get_data(peer->xseg, req);
1253                 map_to_object(&tmp, data);
1254                 mn->flags |= MF_OBJECT_EXIST;
1255                 if (mn->flags != MF_OBJECT_EXIST){
1256                         XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1257                         goto out_err;
1258                 }
1259                 //assert mn->flags & MF_OBJECT_EXIST
1260                 strncpy(mn->object, tmp.object, tmp.objectlen);
1261                 mn->object[tmp.objectlen] = 0;
1262                 mn->objectlen = tmp.objectlen;
1263                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1264                 mio->copyups--;
1265                 signal_mapnode(mn);
1266         } else if (req->op == X_COPY) {
1267         //      issue write_object;
1268                 mn->flags &= ~MF_OBJECT_COPYING;
1269                 struct map *map = mn->map;
1270                 if (!map){
1271                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1272                         goto out_err;
1273                 }
1274
1275                 /* construct a tmp map_node for writing purposes */
1276                 char *target = xseg_get_target(peer->xseg, req);
1277                 struct map_node newmn = *mn;
1278                 newmn.flags = MF_OBJECT_EXIST;
1279                 strncpy(newmn.object, target, req->targetlen);
1280                 newmn.object[req->targetlen] = 0;
1281                 newmn.objectlen = req->targetlen;
1282                 newmn.objectidx = mn->objectidx; 
1283                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1284                 if (!xreq){
1285                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1286                                         "\n\t of map %s [%llu]",
1287                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1288                         goto out_err;
1289                 }
1290                 mn->flags |= MF_OBJECT_WRITING;
1291                 __set_copyup_node (mio, xreq, mn);
1292
1293                 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1294         } else {
1295                 //wtf??
1296                 ;
1297         }
1298
1299 out:
1300         xseg_put_request(peer->xseg, req, pr->portno);
1301         return;
1302
1303 out_err:
1304         mio->copyups--;
1305         XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1306         mio->err = 1;
1307         if (mn)
1308                 signal_mapnode(mn);
1309         goto out;
1310
1311 }
1312
1313 struct r2o {
1314         struct map_node *mn;
1315         uint64_t offset;
1316         uint64_t size;
1317 };
1318
1319 static int req2objs(struct peer_req *pr, struct map *map, int write)
1320 {
1321         int r = 0;
1322         struct peerd *peer = pr->peer;
1323         struct mapper_io *mio = __get_mapper_io(pr);
1324         char *target = xseg_get_target(peer->xseg, pr->req);
1325         uint32_t nr_objs = calc_nr_obj(pr->req);
1326         uint64_t size = sizeof(struct xseg_reply_map) + 
1327                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1328         uint32_t idx, i, ready;
1329         uint64_t rem_size, obj_index, obj_offset, obj_size; 
1330         struct map_node *mn;
1331         mio->copyups = 0;
1332         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1333
1334         /* get map_nodes of request */
1335         struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1336         if (!mns){
1337                 XSEGLOG2(&lc, E, "Cannot allocate mns");
1338                 return -1;
1339         }
1340         idx = 0;
1341         rem_size = pr->req->size;
1342         obj_index = pr->req->offset / block_size;
1343         obj_offset = pr->req->offset & (block_size -1); //modulo
1344         obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1345         mn = get_mapnode(map, obj_index);
1346         if (!mn) {
1347                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1348                 r = -1;
1349                 goto out;
1350         }
1351         mns[idx].mn = mn;
1352         mns[idx].offset = obj_offset;
1353         mns[idx].size = obj_size;
1354         rem_size -= obj_size;
1355         while (rem_size > 0) {
1356                 idx++;
1357                 obj_index++;
1358                 obj_offset = 0;
1359                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
1360                 rem_size -= obj_size;
1361                 mn = get_mapnode(map, obj_index);
1362                 if (!mn) {
1363                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1364                         r = -1;
1365                         goto out;
1366                 }
1367                 mns[idx].mn = mn;
1368                 mns[idx].offset = obj_offset;
1369                 mns[idx].size = obj_size;
1370         }
1371         if (write) {
1372                 ready = 0;
1373                 int can_wait = 0;
1374                 mio->cb=copyup_cb;
1375                 while (ready < (idx + 1)){
1376                         ready = 0;
1377                         for (i = 0; i < (idx+1); i++) {
1378                                 mn = mns[i].mn;
1379                                 //do copyups
1380                                 if (mn->flags & MF_OBJECT_NOT_READY) {
1381                                         if (can_wait){
1382                                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1383                                                 if (mn->flags & MF_OBJECT_DELETED){
1384                                                         mio->err = 1;
1385                                                 }
1386                                                 if (mio->err){
1387                                                         XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1388                                                         if (!mio->copyups){
1389                                                                 r = -1;
1390                                                                 goto out;
1391                                                         }
1392                                                 }
1393                                         }
1394                                 }
1395                                 else if (!(mn->flags & MF_OBJECT_EXIST)) {
1396                                         //calc new_target, copy up object
1397                                         if (copyup_object(peer, mn, pr) == NULL){
1398                                                 XSEGLOG2(&lc, E, "Error in copy up object");
1399                                                 mio->err = 1;
1400                                         } else {
1401                                                 mio->copyups++;
1402                                         }
1403                                 } else {
1404                                         ready++;
1405                                 }
1406                         }
1407                         can_wait = 1;
1408                 }
1409                 /*
1410 pending_copyups:
1411                 while(mio->copyups > 0){
1412                         mio->cb = copyup_cb;
1413                         wait_on_pr(pr, 0);
1414                         ta--;
1415                         st_cond_wait(pr->cond);
1416                 }
1417                 */
1418         }
1419
1420         if (mio->err){
1421                 r = -1;
1422                 XSEGLOG2(&lc, E, "Mio->err");
1423                 goto out;
1424         }
1425
1426         /* resize request to fit reply */
1427         char buf[XSEG_MAX_TARGETLEN];
1428         strncpy(buf, target, pr->req->targetlen);
1429         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1430         if (r < 0) {
1431                 XSEGLOG2(&lc, E, "Cannot resize request");
1432                 goto out;
1433         }
1434         target = xseg_get_target(peer->xseg, pr->req);
1435         strncpy(target, buf, pr->req->targetlen);
1436
1437         /* structure reply */
1438         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1439         reply->cnt = nr_objs;
1440         for (i = 0; i < (idx+1); i++) {
1441                 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1442                 reply->segs[i].targetlen = mns[i].mn->objectlen;
1443                 reply->segs[i].offset = mns[i].offset;
1444                 reply->segs[i].size = mns[i].size;
1445         }
1446 out:
1447         for (i = 0; i < idx; i++) {
1448                 put_mapnode(mns[i].mn);
1449         }
1450         free(mns);
1451         return r;
1452 }
1453
1454 static int do_dropcache(struct peer_req *pr, struct map *map)
1455 {
1456         struct map_node *mn;
1457         struct peerd *peer = pr->peer;
1458         struct mapperd *mapper = __get_mapperd(peer);
1459         uint64_t i;
1460         XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1461         map->flags |= MF_MAP_DROPPING_CACHE;
1462         for (i = 0; i < calc_map_obj(map); i++) {
1463                 mn = get_mapnode(map, i);
1464                 if (mn) {
1465                         if (!(mn->flags & MF_OBJECT_DESTROYED)){
1466                                 //make sure all pending operations on all objects are completed
1467                                 if (mn->flags & MF_OBJECT_NOT_READY){
1468                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1469                                 }
1470                                 mn->flags &= MF_OBJECT_DESTROYED;
1471                         }
1472                         put_mapnode(mn);
1473                 }
1474         }
1475         map->flags &= ~MF_MAP_DROPPING_CACHE;
1476         map->flags |= MF_MAP_DESTROYED;
1477         remove_map(mapper, map);
1478         XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1479         put_map(map);   // put map here to destroy it (matches m->ref = 1 on map create)
1480         return 0;
1481 }
1482
1483 static int do_info(struct peer_req *pr, struct map *map)
1484 {
1485         struct peerd *peer = pr->peer;
1486         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1487         xinfo->size = map->size;
1488         return 0;
1489 }
1490
1491
1492 static int do_close(struct peer_req *pr, struct map *map)
1493 {
1494 //      struct peerd *peer = pr->peer;
1495 //      struct xseg_request *req;
1496         if (map->flags & MF_MAP_EXCLUSIVE) 
1497                 close_map(pr, map);
1498         return do_dropcache(pr, map);
1499 }
1500
1501 static int do_destroy(struct peer_req *pr, struct map *map)
1502 {
1503         uint64_t i;
1504         struct peerd *peer = pr->peer;
1505         struct mapper_io *mio = __get_mapper_io(pr);
1506         struct map_node *mn;
1507         struct xseg_request *req;
1508         
1509         XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1510         map->flags |= MF_MAP_DELETING;
1511         req = delete_map(pr, map);
1512         if (!req)
1513                 return -1;
1514         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1515         if (req->state & XS_FAILED){
1516                 xseg_put_request(peer->xseg, req, pr->portno);
1517                 map->flags &= ~MF_MAP_DELETING;
1518                 return -1;
1519         }
1520         xseg_put_request(peer->xseg, req, pr->portno);
1521         //FIXME
1522         uint64_t nr_obj = calc_map_obj(map);
1523         uint64_t deleted = 0;
1524         while (deleted < nr_obj){ 
1525                 deleted = 0;
1526                 for (i = 0; i < nr_obj; i++){
1527                         mn = get_mapnode(map, i);
1528                         if (mn) {
1529                                 if (!(mn->flags & MF_OBJECT_DESTROYED)){
1530                                         if (mn->flags & MF_OBJECT_EXIST){
1531                                                 //make sure all pending operations on all objects are completed
1532                                                 if (mn->flags & MF_OBJECT_NOT_READY){
1533                                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1534                                                 }
1535                                                 req = delete_object(pr, mn);
1536                                                 if (!req)
1537                                                         if (mio->del_pending){
1538                                                                 goto wait_pending;
1539                                                         } else {
1540                                                                 continue;
1541                                                         }
1542                                                 else {
1543                                                         mio->del_pending++;
1544                                                 }
1545                                         }
1546                                         mn->flags &= MF_OBJECT_DESTROYED;
1547                                 }
1548                                 put_mapnode(mn);
1549                         }
1550                         deleted++;
1551                 }
1552 wait_pending:
1553                 mio->cb = deletion_cb;
1554                 wait_on_pr(pr, mio->del_pending > 0);
1555         }
1556         mio->cb = NULL;
1557         map->flags &= ~MF_MAP_DELETING;
1558         XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1559         return do_close(pr, map);
1560 }
1561
1562 static int do_mapr(struct peer_req *pr, struct map *map)
1563 {
1564         struct peerd *peer = pr->peer;
1565         int r = req2objs(pr, map, 0);
1566         if  (r < 0){
1567                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1568                                 map->volume, 
1569                                 (unsigned long long) pr->req->offset, 
1570                                 (unsigned long long) (pr->req->offset + pr->req->size));
1571                 return -1;
1572         }
1573         XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1574                         map->volume, 
1575                         (unsigned long long) pr->req->offset, 
1576                         (unsigned long long) (pr->req->offset + pr->req->size));
1577         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1578                         (unsigned long long) pr->req->offset,
1579                         (unsigned long long) pr->req->size);
1580         char buf[XSEG_MAX_TARGETLEN+1];
1581         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1582         int i;
1583         for (i = 0; i < reply->cnt; i++) {
1584                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1585                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1586                 buf[reply->segs[i].targetlen] = 0;
1587                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1588                                 (unsigned long long) reply->segs[i].offset,
1589                                 (unsigned long long) reply->segs[i].size);
1590         }
1591         return 0;
1592 }
1593
1594 static int do_mapw(struct peer_req *pr, struct map *map)
1595 {
1596         struct peerd *peer = pr->peer;
1597         int r = req2objs(pr, map, 1);
1598         if  (r < 0){
1599                 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1600                                 map->volume, 
1601                                 (unsigned long long) pr->req->offset, 
1602                                 (unsigned long long) (pr->req->offset + pr->req->size));
1603                 return -1;
1604         }
1605         XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1606                         map->volume, 
1607                         (unsigned long long) pr->req->offset, 
1608                         (unsigned long long) (pr->req->offset + pr->req->size));
1609         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1610                         (unsigned long long) pr->req->offset,
1611                         (unsigned long long) pr->req->size);
1612         char buf[XSEG_MAX_TARGETLEN+1];
1613         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1614         int i;
1615         for (i = 0; i < reply->cnt; i++) {
1616                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1617                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1618                 buf[reply->segs[i].targetlen] = 0;
1619                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1620                                 (unsigned long long) reply->segs[i].offset,
1621                                 (unsigned long long) reply->segs[i].size);
1622         }
1623         return 0;
1624 }
1625
1626 //here map is the parent map
1627 static int do_clone(struct peer_req *pr, struct map *map)
1628 {
1629         /*
1630         FIXME check if clone map exists
1631         clonemap = get_map(pr, target, targetlen, MF_LOAD);
1632         if (clonemap)
1633                 do_dropcache(pr, clonemap); // drop map here, rely on get_map_function to drop
1634                                         //      cache on non-exclusive opens or declare a NO_CACHE flag ?
1635                 return -1;
1636         */
1637
1638         int r;
1639         char buf[XSEG_MAX_TARGETLEN];
1640         struct peerd *peer = pr->peer;
1641         struct mapperd *mapper = __get_mapperd(peer);
1642         char *target = xseg_get_target(peer->xseg, pr->req);
1643         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1644         XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1645         struct map *clonemap = create_map(mapper, target, pr->req->targetlen,
1646                                                 MF_ARCHIP);
1647         if (!clonemap) 
1648                 return -1;
1649
1650         if (xclone->size == -1)
1651                 clonemap->size = map->size;
1652         else
1653                 clonemap->size = xclone->size;
1654         if (clonemap->size < map->size){
1655                 target = xseg_get_target(peer->xseg, pr->req);
1656                 strncpy(buf, target, pr->req->targetlen);
1657                 buf[pr->req->targetlen] = 0;
1658                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1659                                 "\n\t for requested clone %s",
1660                                 (unsigned long long) xclone->size,
1661                                 (unsigned long long) map->size, buf);
1662                 goto out_err;
1663         }
1664
1665         //alloc and init map_nodes
1666         unsigned long c = clonemap->size/block_size + 1;
1667         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1668         if (!map_nodes){
1669                 goto out_err;
1670         }
1671         int i;
1672         for (i = 0; i < clonemap->size/block_size + 1; i++) {
1673                 struct map_node *mn = get_mapnode(map, i);
1674                 if (mn) {
1675                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1676                         map_nodes[i].objectlen = mn->objectlen;
1677                         put_mapnode(mn);
1678                 } else {
1679                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1680                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1681                 }
1682                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1683                 map_nodes[i].flags = 0;
1684                 map_nodes[i].objectidx = i;
1685                 map_nodes[i].map = clonemap;
1686                 map_nodes[i].ref = 1;
1687                 map_nodes[i].waiters = 0;
1688                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1689                 r = insert_object(clonemap, &map_nodes[i]);
1690                 if (r < 0){
1691                         XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1692                         goto out_err;
1693                 }
1694         }
1695         r = write_map(pr, clonemap);
1696         if (r < 0){
1697                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1698                 goto out_err;
1699         }
1700         return 0;
1701
1702 out_err:
1703         put_map(clonemap);
1704         return -1;
1705 }
1706
1707 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1708 {
1709         int r, opened = 0;
1710         if (flags & MF_EXCLUSIVE){
1711                 r = open_map(pr, map, flags);
1712                 if (r < 0) {
1713                         if (flags & MF_FORCE){
1714                                 return -1;
1715                         }
1716                 } else {
1717                         opened = 1;
1718                 }
1719         }
1720         r = load_map(pr, map);
1721         if (r < 0 && opened){
1722                 close_map(pr, map);
1723         }
1724         return r;
1725 }
1726
1727 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
1728                         uint32_t flags)
1729 {
1730         int r;
1731         struct peerd *peer = pr->peer;
1732         struct mapperd *mapper = __get_mapperd(peer);
1733         struct map *map = find_map_len(mapper, name, namelen, flags);
1734         if (!map){
1735                 if (flags & MF_LOAD){
1736                         map = create_map(mapper, name, namelen, flags);
1737                         if (!map)
1738                                 return NULL;
1739                         r = open_load_map(pr, map, flags);
1740                         if (r < 0){
1741                                 do_dropcache(pr, map);
1742                                 return NULL;
1743                         }
1744                 } else {
1745                         return NULL;
1746                 }
1747         } else if (map->flags & MF_MAP_DESTROYED){
1748                 return NULL;
1749         }
1750         __get_map(map);
1751         return map;
1752
1753 }
1754
1755 static int map_action(int (action)(struct peer_req *pr, struct map *map),
1756                 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1757 {
1758         //struct peerd *peer = pr->peer;
1759         struct map *map;
1760 start:
1761         map = get_map(pr, name, namelen, flags);
1762         if (!map)
1763                 return -1;
1764         if (map->flags & MF_MAP_NOT_READY){
1765                 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1766                 put_map(map);
1767                 goto start;
1768         }
1769         int r = action(pr, map);
1770         //always drop cache if map not read exclusively
1771         if (!(map->flags & MF_MAP_EXCLUSIVE))
1772                 do_dropcache(pr, map);
1773         signal_map(map);
1774         put_map(map);
1775         return r;
1776 }
1777
1778 void * handle_info(struct peer_req *pr)
1779 {
1780         struct peerd *peer = pr->peer;
1781         char *target = xseg_get_target(peer->xseg, pr->req);
1782         int r = map_action(do_info, pr, target, pr->req->targetlen,
1783                                 MF_ARCHIP|MF_LOAD);
1784         if (r < 0)
1785                 fail(peer, pr);
1786         else
1787                 complete(peer, pr);
1788         ta--;
1789         return NULL;
1790 }
1791
1792 void * handle_clone(struct peer_req *pr)
1793 {
1794         int r;
1795         struct peerd *peer = pr->peer;
1796         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1797         if (!xclone) {
1798                 r = -1;
1799                 goto out;
1800         }
1801         if (xclone->targetlen){
1802                 //support clone only from pithos
1803                 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
1804                                         MF_LOAD);
1805         } else {
1806                 if (!xclone->size){
1807                         r = -1;
1808                 } else {
1809                         //FIXME
1810                         struct map *map;
1811                         char *target = xseg_get_target(peer->xseg, pr->req);
1812                         XSEGLOG2(&lc, I, "Creating volume");
1813                         map = get_map(pr, target, pr->req->targetlen,
1814                                                 MF_ARCHIP|MF_LOAD);
1815                         if (map){
1816                                 XSEGLOG2(&lc, E, "Volume %s exists", map->volume);
1817                                 if (map->ref <= 2) //initial one + one ref from __get_map
1818                                         do_dropcache(pr, map); //we are the only ones usining this map. Drop the cache. 
1819                                 put_map(map); //matches get_map
1820                                 r = -1;
1821                                 goto out;
1822                         }
1823                         //create a new empty map of size
1824                         map = create_map(mapper, target, pr->req->targetlen,
1825                                                 MF_ARCHIP);
1826                         if (!map){
1827                                 r = -1;
1828                                 goto out;
1829                         }
1830                         map->size = xclone->size;
1831                         //populate_map with zero objects;
1832                         uint64_t nr_objs = xclone->size / block_size;
1833                         if (xclone->size % block_size)
1834                                 nr_objs++;
1835
1836                         struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1837                         if (!map_nodes){
1838                                 do_dropcache(pr, map); //Since we just created the map, dropping cache should be sufficient.
1839                                 r = -1;
1840                                 goto out;
1841                         }
1842                         uint64_t i;
1843                         for (i = 0; i < nr_objs; i++) {
1844                                 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1845                                 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1846                                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1847                                 map_nodes[i].flags = 0;
1848                                 map_nodes[i].objectidx = i;
1849                                 map_nodes[i].map = map;
1850                                 map_nodes[i].ref = 1;
1851                                 map_nodes[i].waiters = 0;
1852                                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1853                                 r = insert_object(map, &map_nodes[i]);
1854                                 if (r < 0){
1855                                         do_dropcache(pr, map);
1856                                         r = -1;
1857                                         goto out;
1858                                 }
1859                         }
1860                         r = write_map(pr, map);
1861                         if (r < 0){
1862                                 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1863                                 do_dropcache(pr, map);
1864                                 goto out;
1865                         }
1866                         XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1867                         r = 0;
1868                         do_dropcache(pr, map); //drop cache here for consistency
1869                 }
1870         }
1871 out:
1872         if (r < 0)
1873                 fail(peer, pr);
1874         else
1875                 complete(peer, pr);
1876         ta--;
1877         return NULL;
1878 }
1879
1880 void * handle_mapr(struct peer_req *pr)
1881 {
1882         struct peerd *peer = pr->peer;
1883         char *target = xseg_get_target(peer->xseg, pr->req);
1884         int r = map_action(do_mapr, pr, target, pr->req->targetlen,
1885                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
1886         if (r < 0)
1887                 fail(peer, pr);
1888         else
1889                 complete(peer, pr);
1890         ta--;
1891         return NULL;
1892 }
1893
1894 void * handle_mapw(struct peer_req *pr)
1895 {
1896         struct peerd *peer = pr->peer;
1897         char *target = xseg_get_target(peer->xseg, pr->req);
1898         int r = map_action(do_mapw, pr, target, pr->req->targetlen,
1899                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1900         if (r < 0)
1901                 fail(peer, pr);
1902         else
1903                 complete(peer, pr);
1904         XSEGLOG2(&lc, D, "Ta: %d", ta);
1905         ta--;
1906         return NULL;
1907 }
1908
1909 void * handle_destroy(struct peer_req *pr)
1910 {
1911         struct peerd *peer = pr->peer;
1912         char *target = xseg_get_target(peer->xseg, pr->req);
1913         int r = map_action(do_destroy, pr, target, pr->req->targetlen,
1914                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1915         if (r < 0)
1916                 fail(peer, pr);
1917         else
1918                 complete(peer, pr);
1919         ta--;
1920         return NULL;
1921 }
1922
1923 void * handle_close(struct peer_req *pr)
1924 {
1925         struct peerd *peer = pr->peer;
1926         char *target = xseg_get_target(peer->xseg, pr->req);
1927         //here we do not want to load
1928         int r = map_action(do_close, pr, target, pr->req->targetlen,
1929                                 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
1930         if (r < 0)
1931                 fail(peer, pr);
1932         else
1933                 complete(peer, pr);
1934         ta--;
1935         return NULL;
1936 }
1937
1938 int dispatch_accepted(struct peerd *peer, struct peer_req *pr, 
1939                         struct xseg_request *req)
1940 {
1941         //struct mapperd *mapper = __get_mapperd(peer);
1942         struct mapper_io *mio = __get_mapper_io(pr);
1943         void *(*action)(struct peer_req *) = NULL;
1944
1945         mio->state = ACCEPTED;
1946         mio->err = 0;
1947         mio->cb = NULL;
1948         switch (pr->req->op) {
1949                 /* primary xseg operations of mapper */
1950                 case X_CLONE: action = handle_clone; break;
1951                 case X_MAPR: action = handle_mapr; break;
1952                 case X_MAPW: action = handle_mapw; break;
1953 //              case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1954                 case X_INFO: action = handle_info; break;
1955                 case X_DELETE: action = handle_destroy; break;
1956                 case X_CLOSE: action = handle_close; break;
1957                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1958         }
1959         if (action){
1960                 ta++;
1961                 mio->active = 1;
1962                 st_thread_create(action, pr, 0, 0);
1963         }
1964         return 0;
1965
1966 }
1967
1968 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1969                 enum dispatch_reason reason)
1970 {
1971         struct mapperd *mapper = __get_mapperd(peer);
1972         (void) mapper;
1973         struct mapper_io *mio = __get_mapper_io(pr);
1974         (void) mio;
1975
1976
1977         if (reason == dispatch_accept)
1978                 dispatch_accepted(peer, pr, req);
1979         else {
1980                 if (mio->cb){
1981                         mio->cb(pr, req);
1982                 } else { 
1983                         signal_pr(pr);
1984                 }
1985         }
1986         return 0;
1987 }
1988
1989 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1990 {
1991         int i;
1992 //      unsigned char buf[SHA256_DIGEST_SIZE];
1993 //      unsigned char *zero;
1994
1995         //gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1996
1997         /* Version check should be the very first call because it
1998           makes sure that important subsystems are intialized. */
1999 //              gcry_check_version (NULL);
2000
2001         /* Disable secure memory.  */
2002 //      gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
2003
2004         /* Tell Libgcrypt that initialization has completed. */
2005 //      gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
2006
2007         /* calculate out magic sha hash value */
2008 //      gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
2009
2010         /* calculate zero block */
2011         //FIXME check hash value
2012 //      zero = malloc(block_size);
2013 //      memset(zero, 0, block_size);
2014 //      gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
2015 //      for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
2016 //              sprintf(zero_block + 2*i, "%02x", buf[i]);
2017 //      printf("%s \n", zero_block);
2018 //      free(zero);
2019
2020         //FIXME error checks
2021         struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2022         peer->priv = mapperd;
2023         mapper = mapperd;
2024         mapper->hashmaps = xhash_new(3, STRING);
2025         
2026         for (i = 0; i < peer->nr_ops; i++) {
2027                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2028                 mio->copyups_nodes = xhash_new(3, INTEGER);
2029                 mio->copyups = 0;
2030                 mio->err = 0;
2031                 mio->active = 0;
2032                 peer->peer_reqs[i].priv = mio;
2033         }
2034
2035         for (i = 0; i < argc; i++) {
2036                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
2037                         mapper->bportno = atoi(argv[i+1]);
2038                         i += 1;
2039                         continue;
2040                 }
2041                 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
2042                         mapper->mbportno = atoi(argv[i+1]);
2043                         i += 1;
2044                         continue;
2045                 }
2046                 /* enforce only one thread */
2047                 if (!strcmp(argv[i], "-t") && (i+1) < argc){
2048                         int t = atoi(argv[i+1]);
2049                         if (t != 1) {
2050                                 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
2051                                 return -1;
2052                         }
2053                         i += 1;
2054                         continue;
2055                 }
2056         }
2057
2058         const struct sched_param param = { .sched_priority = 99 };
2059         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
2060
2061
2062 //      test_map(peer);
2063
2064         return 0;
2065 }
2066
2067 void print_obj(struct map_node *mn)
2068 {
2069         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2070                         (unsigned long long) mn->objectidx, mn->object, 
2071                         (unsigned int) mn->objectlen, 
2072                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2073 }
2074
2075 void print_map(struct map *m)
2076 {
2077         uint64_t nr_objs = m->size/block_size;
2078         if (m->size % block_size)
2079                 nr_objs++;
2080         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n", 
2081                         m->volume, m->volumelen, 
2082                         (unsigned long long) m->size, 
2083                         (unsigned long long) nr_objs);
2084         uint64_t i;
2085         struct map_node *mn;
2086         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2087                 return;
2088         for (i = 0; i < nr_objs; i++) {
2089                 mn = find_object(m, i);
2090                 if (!mn){
2091                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
2092                         continue;
2093                 }
2094                 print_obj(mn);
2095         }
2096 }
2097
2098 /*
2099 void test_map(struct peerd *peer)
2100 {
2101         int i,j, ret;
2102         //struct sha256_ctx sha256ctx;
2103         unsigned char buf[SHA256_DIGEST_SIZE];
2104         char buf_new[XSEG_MAX_TARGETLEN + 20];
2105         struct map *m = malloc(sizeof(struct map));
2106         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2107         m->volume[XSEG_MAX_TARGETLEN] = 0;
2108         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2109         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2110         m->volumelen = XSEG_MAX_TARGETLEN;
2111         m->size = 100*block_size;
2112         m->objects = xhash_new(3, INTEGER);
2113         struct map_node *map_node = calloc(100, sizeof(struct map_node));
2114         for (i = 0; i < 100; i++) {
2115                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2116                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2117                 
2118                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2119                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2120                 }
2121                 map_node[i].objectidx = i;
2122                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2123                 map_node[i].flags = MF_OBJECT_EXIST;
2124                 ret = insert_object(m, &map_node[i]);
2125         }
2126
2127         char *data = malloc(block_size);
2128         mapheader_to_map(m, data);
2129         uint64_t pos = mapheader_size;
2130
2131         for (i = 0; i < 100; i++) {
2132                 map_node = find_object(m, i);
2133                 if (!map_node){
2134                         printf("no object node %d \n", i);
2135                         exit(1);
2136                 }
2137                 object_to_map(data+pos, map_node);
2138                 pos += objectsize_in_map;
2139         }
2140 //      print_map(m);
2141
2142         struct map *m2 = malloc(sizeof(struct map));
2143         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2144         m->volume[XSEG_MAX_TARGETLEN] = 0;
2145         m->volumelen = XSEG_MAX_TARGETLEN;
2146
2147         m2->objects = xhash_new(3, INTEGER);
2148         ret = read_map(peer, m2, data);
2149 //      print_map(m2);
2150
2151         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2152         ssize_t r, sum = 0;
2153         while (sum < block_size) {
2154                 r = write(fd, data + sum, block_size -sum);
2155                 if (r < 0){
2156                         perror("write");
2157                         printf("write error\n");
2158                         exit(1);
2159                 } 
2160                 sum += r;
2161         }
2162         close(fd);
2163         map_node = find_object(m, 0);
2164         free(map_node);
2165         free(m);
2166 }
2167 */