c04edaa4c5c2f5f29e33cc363473fb062554f55e
[archipelago] / xseg / peers / user / mt-mapperd.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <pthread.h>
5 #include <xseg/xseg.h>
6 #include <peer.h>
7 #include <time.h>
8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <errno.h>
14 #include <sched.h>
15 #include <sys/syscall.h>
16 #include <openssl/sha.h>
17
18 /* general mapper flags */
19 #define MF_LOAD         (1 << 0)
20 #define MF_EXCLUSIVE    (1 << 1)
21 #define MF_FORCE        (1 << 2)
22 #define MF_ARCHIP       (1 << 3)
23
24 #ifndef SHA256_DIGEST_SIZE
25 #define SHA256_DIGEST_SIZE 32
26 #endif
27 /* hex representation of sha256 value takes up double the sha256 size */
28 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
29
30 #define block_size (1<<22) //FIXME this should be defined here?
31
32 /* transparency byte + max object len in disk */
33 #define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
34
35 /* Map header contains:
36  *      map version
37  *      volume size
38  */
39 #define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
40
41
42 #define MAPPER_PREFIX "archip_"
43 #define MAPPER_PREFIX_LEN 7
44
45 #define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
46 #define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
47
48 #if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
49 #error  "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
50 #endif
51
52 #define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
53
54 #if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
55 #error  "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
56 #endif
57
58 #define MAX_VOLUME_SIZE \
59 ((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
60
61
62 char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
63 #define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
64
65 /* dispatch_internal mapper states */
66 enum mapper_state {
67         ACCEPTED = 0,
68         WRITING = 1,
69         COPYING = 2,
70         DELETING = 3,
71         DROPPING_CACHE = 4
72 };
73
74 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
75
76
77 /* mapper object flags */
78 #define MF_OBJECT_EXIST         (1 << 0)
79 #define MF_OBJECT_COPYING       (1 << 1)
80 #define MF_OBJECT_WRITING       (1 << 2)
81 #define MF_OBJECT_DELETING      (1 << 3)
82 #define MF_OBJECT_DESTROYED     (1 << 5)
83
84 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
85                                         MF_OBJECT_DELETING)
86 struct map_node {
87         uint32_t flags;
88         uint32_t objectidx;
89         uint32_t objectlen;
90         char object[MAX_OBJECT_LEN + 1];        /* NULL terminated string */
91         struct map *map;
92         uint32_t ref;
93         uint32_t waiters;
94         st_cond_t cond;
95 };
96
97
98 #define wait_on_pr(__pr, __condition__)         \
99         while (__condition__){                  \
100                 ta--;                           \
101                 __get_mapper_io(pr)->active = 0;\
102                 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
103                 st_cond_wait(__pr->cond);       \
104         }
105
106 #define wait_on_mapnode(__mn, __condition__)    \
107         while (__condition__){                  \
108                 ta--;                           \
109                 __mn->waiters++;                \
110                 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
111                         ta: %u",  __mn, __mn->object, __mn->waiters, ta);  \
112                 st_cond_wait(__mn->cond);       \
113         }
114
115 #define wait_on_map(__map, __condition__)       \
116         while (__condition__){                  \
117                 ta--;                           \
118                 __map->waiters++;               \
119                 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
120                                    __map, __map->volume, __map->waiters, ta); \
121                 st_cond_wait(__map->cond);      \
122         }
123
124 #define signal_pr(__pr)                         \
125         do {                                    \
126                 if (!__get_mapper_io(pr)->active){\
127                         ta++;                   \
128                         XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta);\
129                         __get_mapper_io(pr)->active = 1;\
130                         st_cond_signal(__pr->cond);     \
131                 }                               \
132         }while(0)
133
134 #define signal_map(__map)                       \
135         do {                                    \
136                 if (__map->waiters) {           \
137                         ta += 1;                \
138                         XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
139                         ta: %u",  __map, __map->volume, __map->waiters, ta); \
140                         __map->waiters--;       \
141                         st_cond_signal(__map->cond);    \
142                 }                               \
143         }while(0)
144
145 #define signal_mapnode(__mn)                    \
146         do {                                    \
147                 if (__mn->waiters) {            \
148                         ta += __mn->waiters;    \
149                         XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
150                         %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
151                         __mn->waiters = 0;      \
152                         st_cond_broadcast(__mn->cond);  \
153                 }                               \
154         }while(0)
155
156
157 /* map flags */
158 #define MF_MAP_LOADING          (1 << 0)
159 #define MF_MAP_DESTROYED        (1 << 1)
160 #define MF_MAP_WRITING          (1 << 2)
161 #define MF_MAP_DELETING         (1 << 3)
162 #define MF_MAP_DROPPING_CACHE   (1 << 4)
163 #define MF_MAP_EXCLUSIVE        (1 << 5)
164 #define MF_MAP_OPENING          (1 << 6)
165 #define MF_MAP_CLOSING          (1 << 7)
166 #define MF_MAP_DELETED          (1 << 8)
167
168 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
169                                         MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
170
171 struct map {
172         uint32_t version;
173         uint32_t flags;
174         uint64_t size;
175         uint32_t volumelen;
176         char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
177         xhash_t *objects;       /* obj_index --> map_node */
178         uint32_t ref;
179         uint32_t waiters;
180         st_cond_t cond;
181 };
182
183 struct mapperd {
184         xport bportno;          /* blocker that accesses data */
185         xport mbportno;         /* blocker that accesses maps */
186         xhash_t *hashmaps; // hash_function(target) --> struct map
187 };
188
189 struct mapper_io {
190         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
191         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
192         struct map_node *copyup_node;
193         volatile int err;                       /* error flag */
194         volatile uint64_t del_pending;
195         uint64_t delobj;
196         uint64_t dcobj;
197         cb_t cb;
198         enum mapper_state state;
199         volatile int active;
200 };
201
202 /* global vars */
203 struct mapperd *mapper;
204
205 void print_map(struct map *m);
206
207
208 void custom_peer_usage()
209 {
210         fprintf(stderr, "Custom peer options: \n"
211                         "-bp  : port for block blocker(!)\n"
212                         "-mbp : port for map blocker\n"
213                         "\n");
214 }
215
216
217 /*
218  * Helper functions
219  */
220
221 static inline struct mapperd * __get_mapperd(struct peerd *peer)
222 {
223         return (struct mapperd *) peer->priv;
224 }
225
226 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
227 {
228         return (struct mapper_io *) pr->priv;
229 }
230
231 static inline uint64_t calc_map_obj(struct map *map)
232 {
233         if (map->size == -1)
234                 return 0;
235         uint64_t nr_objs = map->size / block_size;
236         if (map->size % block_size)
237                 nr_objs++;
238         return nr_objs;
239 }
240
241 static uint32_t calc_nr_obj(struct xseg_request *req)
242 {
243         unsigned int r = 1;
244         uint64_t rem_size = req->size;
245         uint64_t obj_offset = req->offset & (block_size -1); //modulo
246         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
247         rem_size -= obj_size;
248         while (rem_size > 0) {
249                 obj_size = (rem_size > block_size) ? block_size : rem_size;
250                 rem_size -= obj_size;
251                 r++;
252         }
253
254         return r;
255 }
256
257 /* hexlify function. 
258  * Unsafe. Doesn't check if data length is odd!
259  */
260 static void hexlify(unsigned char *data, char *hex)
261 {
262         int i;
263         for (i=0; i<SHA256_DIGEST_LENGTH; i++)
264                 sprintf(hex+2*i, "%02x", data[i]);
265 }
266
267 static void unhexlify(char *hex, unsigned char *data)
268 {
269         int i;
270         char c;
271         for (i=0; i<SHA256_DIGEST_LENGTH; i++){
272                 data[i] = 0;
273                 c = hex[2*i];
274                 if (isxdigit(c)){
275                         if (isdigit(c)){
276                                 c-= '0';
277                         }
278                         else {
279                                 c = tolower(c);
280                                 c = c-'a' + 10;
281                         }
282                 }
283                 else {
284                         c = 0;
285                 }
286                 data[i] |= (c << 4) & 0xF0;
287                 c = hex[2*i+1];
288                 if (isxdigit(c)){
289                         if (isdigit(c)){
290                                 c-= '0';
291                         }
292                         else {
293                                 c = tolower(c);
294                                 c = c-'a' + 10;
295                         }
296                 }
297                 else {
298                         c = 0;
299                 }
300                 data[i] |= c & 0x0F;
301         }
302 }
303 /*
304  * Maps handling functions
305  */
306
307 static struct map * find_map(struct mapperd *mapper, char *volume)
308 {
309         struct map *m = NULL;
310         int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
311                                 (xhashidx *) &m);
312         if (r < 0)
313                 return NULL;
314         return m;
315 }
316
317 static struct map * find_map_len(struct mapperd *mapper, char *target,
318                                         uint32_t targetlen, uint32_t flags)
319 {
320         char buf[XSEG_MAX_TARGETLEN+1];
321         if (flags & MF_ARCHIP){
322                 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
323                 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
324                 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
325                 targetlen += MAPPER_PREFIX_LEN;
326         }
327         else {
328                 strncpy(buf, target, targetlen);
329                 buf[targetlen] = 0;
330         }
331
332         if (targetlen > MAX_VOLUME_LEN){
333                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
334                                         targetlen, MAX_VOLUME_LEN);
335                 return NULL;
336         }
337
338         XSEGLOG2(&lc, D, "looking up map %s, len %u",
339                         buf, targetlen);
340         return find_map(mapper, buf);
341 }
342
343
344 static int insert_map(struct mapperd *mapper, struct map *map)
345 {
346         int r = -1;
347
348         if (find_map(mapper, map->volume)){
349                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
350                 goto out;
351         }
352
353         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
354                         map->volume, strlen(map->volume), (unsigned long) map);
355         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
356         while (r == -XHASH_ERESIZE) {
357                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
358                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
359                 if (!new_hashmap){
360                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
361                                         (unsigned long long) shift);
362                         goto out;
363                 }
364                 mapper->hashmaps = new_hashmap;
365                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
366         }
367 out:
368         return r;
369 }
370
371 static int remove_map(struct mapperd *mapper, struct map *map)
372 {
373         int r = -1;
374
375         //assert no pending pr on map
376
377         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
378         while (r == -XHASH_ERESIZE) {
379                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
380                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
381                 if (!new_hashmap){
382                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
383                                         (unsigned long long) shift);
384                         goto out;
385                 }
386                 mapper->hashmaps = new_hashmap;
387                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
388         }
389 out:
390         return r;
391 }
392
393 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
394 {
395         int r;
396         xport p;
397         struct peerd *peer = pr->peer;
398         struct xseg_request *req;
399         struct mapperd *mapper = __get_mapperd(peer);
400         void *dummy;
401
402         XSEGLOG2(&lc, I, "Closing map %s", map->volume);
403
404         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
405         if (!req){
406                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
407                                 map->volume);
408                 goto out_err;
409         }
410
411         r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
412         if (r < 0){
413                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
414                                 map->volume);
415                 goto out_put;
416         }
417
418         char *reqtarget = xseg_get_target(peer->xseg, req);
419         if (!reqtarget)
420                 goto out_put;
421         strncpy(reqtarget, map->volume, req->targetlen);
422         req->op = X_CLOSE;
423         req->size = 0;
424         req->offset = 0;
425         r = xseg_set_req_data(peer->xseg, req, pr);
426         if (r < 0){
427                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
428                                 map->volume);
429                 goto out_put;
430         }
431         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
432         if (p == NoPort){
433                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
434                                 map->volume);
435                 goto out_unset;
436         }
437         r = xseg_signal(peer->xseg, p);
438         map->flags |= MF_MAP_CLOSING;
439
440         XSEGLOG2(&lc, I, "Map %s closing", map->volume);
441         return req;
442
443 out_unset:
444         xseg_get_req_data(peer->xseg, req, &dummy);
445 out_put:
446         xseg_put_request(peer->xseg, req, pr->portno);
447 out_err:
448         return NULL;
449 }
450
451 static int close_map(struct peer_req *pr, struct map *map)
452 {
453         int err;
454         struct xseg_request *req;
455         struct peerd *peer = pr->peer;
456
457         req = __close_map(pr, map);
458         if (!req)
459                 return -1;
460         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
461         map->flags &= ~MF_MAP_CLOSING;
462         err = req->state & XS_FAILED;
463         xseg_put_request(peer->xseg, req, pr->portno);
464         if (err)
465                 return -1;
466         return 0;
467 }
468
469 /*
470 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
471                                 char *target, uint32_t targetlen, struct map **m)
472 {
473         struct mapperd *mapper = __get_mapperd(peer);
474         int r;
475         *m = find_map(mapper, target, targetlen);
476         if (*m) {
477                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
478                 if ((*m)->flags & MF_MAP_NOT_READY) {
479                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
480                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
481                         return MF_PENDING;
482                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
483                 //      return -1;
484                 // 
485                 }else {
486                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
487                         return 0;
488                 }
489         }
490         r = open_map(peer, pr, target, targetlen, 0);
491         if (r < 0)
492                 return -1; //error
493         return MF_PENDING;      
494 }
495 */
496 /*
497  * Object handling functions
498  */
499
500 struct map_node *find_object(struct map *map, uint64_t obj_index)
501 {
502         struct map_node *mn;
503         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
504         if (r < 0)
505                 return NULL;
506         return mn;
507 }
508
509 static int insert_object(struct map *map, struct map_node *mn)
510 {
511         //FIXME no find object first
512         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
513         if (r == -XHASH_ERESIZE) {
514                 unsigned long shift = xhash_grow_size_shift(map->objects);
515                 map->objects = xhash_resize(map->objects, shift, NULL);
516                 if (!map->objects)
517                         return -1;
518                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
519         }
520         return r;
521 }
522
523
524 /*
525  * map read/write functions 
526  */
527 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
528 {
529         hexlify(buf, mn->object);
530         mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
531         mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
532         mn->flags = MF_OBJECT_EXIST;
533 }
534
535 static inline void map_to_object(struct map_node *mn, unsigned char *buf)
536 {
537         char c = buf[0];
538         mn->flags = 0;
539         if (c){
540                 mn->flags |= MF_OBJECT_EXIST;
541                 strcpy(mn->object, MAPPER_PREFIX);
542                 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
543                 mn->object[MAX_OBJECT_LEN] = 0;
544                 mn->objectlen = strlen(mn->object);
545         }
546         else {
547                 hexlify(buf+1, mn->object);
548                 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
549                 mn->objectlen = strlen(mn->object);
550         }
551
552 }
553
554 static inline void object_to_map(char* buf, struct map_node *mn)
555 {
556         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
557         if (buf[0]){
558                 /* strip common prefix */
559                 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
560         }
561         else {
562                 unhexlify(mn->object, (unsigned char *)(buf+1));
563         }
564 }
565
566 static inline void mapheader_to_map(struct map *m, char *buf)
567 {
568         uint64_t pos = 0;
569         memcpy(buf + pos, &m->version, sizeof(m->version));
570         pos += sizeof(m->version);
571         memcpy(buf + pos, &m->size, sizeof(m->size));
572         pos += sizeof(m->size);
573 }
574
575
576 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
577                                 struct map *map, struct map_node *mn)
578 {
579         void *dummy;
580         struct mapperd *mapper = __get_mapperd(peer);
581         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
582                                                         mapper->mbportno, X_ALLOC);
583         if (!req){
584                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
585                                 "(Map: %s [%llu]",
586                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
587                 goto out_err;
588         }
589         int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
590         if (r < 0){
591                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
592                                 "(Map: %s [%llu]",
593                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
594                 goto out_put;
595         }
596         char *target = xseg_get_target(peer->xseg, req);
597         strncpy(target, map->volume, req->targetlen);
598         req->size = objectsize_in_map;
599         req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
600         req->op = X_WRITE;
601         char *data = xseg_get_data(peer->xseg, req);
602         object_to_map(data, mn);
603
604         r = xseg_set_req_data(peer->xseg, req, pr);
605         if (r < 0){
606                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
607                                 "(Map: %s [%llu]",
608                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
609                 goto out_put;
610         }
611         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
612         if (p == NoPort){
613                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
614                                 "(Map: %s [%llu]",
615                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
616                 goto out_unset;
617         }
618         r = xseg_signal(peer->xseg, p);
619         if (r < 0)
620                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
621
622         XSEGLOG2(&lc, I, "Writing object %s \n\t"
623                         "Map: %s [%llu]",
624                         mn->object, map->volume, (unsigned long long) mn->objectidx);
625
626         return req;
627
628 out_unset:
629         xseg_get_req_data(peer->xseg, req, &dummy);
630 out_put:
631         xseg_put_request(peer->xseg, req, pr->portno);
632 out_err:
633         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
634                         "(Map: %s [%llu]",
635                         mn->object, map->volume, (unsigned long long) mn->objectidx);
636         return NULL;
637 }
638
639 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
640 {
641         void *dummy;
642         struct peerd *peer = pr->peer;
643         struct mapperd *mapper = __get_mapperd(peer);
644         struct map_node *mn;
645         uint64_t i, pos, max_objidx = calc_map_obj(map);
646         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
647                                                         mapper->mbportno, X_ALLOC);
648         if (!req){
649                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
650                 goto out_err;
651         }
652         int r = xseg_prep_request(peer->xseg, req, map->volumelen,
653                         mapheader_size + max_objidx * objectsize_in_map);
654         if (r < 0){
655                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
656                 goto out_put;
657         }
658         char *target = xseg_get_target(peer->xseg, req);
659         strncpy(target, map->volume, req->targetlen);
660         char *data = xseg_get_data(peer->xseg, req);
661         mapheader_to_map(map, data);
662         pos = mapheader_size;
663         req->op = X_WRITE;
664         req->size = req->datalen;
665         req->offset = 0;
666
667         if (map->size % block_size)
668                 max_objidx++;
669         for (i = 0; i < max_objidx; i++) {
670                 mn = find_object(map, i);
671                 if (!mn){
672                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
673                                         (unsigned long long) i, map->volume);
674                         goto out_put;
675                 }
676                 object_to_map(data+pos, mn);
677                 pos += objectsize_in_map;
678         }
679         r = xseg_set_req_data(peer->xseg, req, pr);
680         if (r < 0){
681                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
682                                 map->volume);
683                 goto out_put;
684         }
685         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
686         if (p == NoPort){
687                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
688                                 map->volume);
689                 goto out_unset;
690         }
691         r = xseg_signal(peer->xseg, p);
692         if (r < 0)
693                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
694
695         map->flags |= MF_MAP_WRITING;
696         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
697         return req;
698
699 out_unset:
700         xseg_get_req_data(peer->xseg, req, &dummy);
701 out_put:
702         xseg_put_request(peer->xseg, req, pr->portno);
703 out_err:
704         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
705         return NULL;
706 }
707
708 static int write_map(struct peer_req* pr, struct map *map)
709 {
710         int r = 0;
711         struct peerd *peer = pr->peer;
712         struct xseg_request *req = __write_map(pr, map);
713         if (!req)
714                 return -1;
715         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
716         if (req->state & XS_FAILED)
717                 r = -1;
718         xseg_put_request(peer->xseg, req, pr->portno);
719         map->flags &= ~MF_MAP_WRITING;
720         return r;
721 }
722
723 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
724 {
725         int r;
726         xport p;
727         struct xseg_request *req;
728         struct peerd *peer = pr->peer;
729         struct mapperd *mapper = __get_mapperd(peer);
730         void *dummy;
731
732         XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
733
734         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
735         if (!req){
736                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
737                                 m->volume);
738                 goto out_fail;
739         }
740
741         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
742         if (r < 0){
743                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
744                                 m->volume);
745                 goto out_put;
746         }
747
748         char *reqtarget = xseg_get_target(peer->xseg, req);
749         if (!reqtarget)
750                 goto out_put;
751         strncpy(reqtarget, m->volume, req->targetlen);
752         req->op = X_READ;
753         req->size = block_size;
754         req->offset = 0;
755         r = xseg_set_req_data(peer->xseg, req, pr);
756         if (r < 0){
757                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
758                                 m->volume);
759                 goto out_put;
760         }
761         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
762         if (p == NoPort){ 
763                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
764                                 m->volume);
765                 goto out_unset;
766         }
767         r = xseg_signal(peer->xseg, p);
768
769         m->flags |= MF_MAP_LOADING;
770         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
771         return req;
772
773 out_unset:
774         xseg_get_req_data(peer->xseg, req, &dummy);
775 out_put:
776         xseg_put_request(peer->xseg, req, pr->portno);
777 out_fail:
778         return NULL;
779 }
780
781 static int read_map (struct map *map, unsigned char *buf)
782 {
783         char nulls[SHA256_DIGEST_SIZE];
784         memset(nulls, 0, SHA256_DIGEST_SIZE);
785
786         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
787         if (r) {
788                 XSEGLOG2(&lc, D, "Read zeros");
789                 //read error;
790                 return -1;
791         }
792         //type 1, archip type, type 0 pithos map
793         int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
794         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
795         uint64_t pos;
796         uint64_t i, nr_objs;
797         struct map_node *map_node;
798         if (type) {
799                 uint64_t pos = 0;
800                 map->version = *(uint32_t *) (buf + pos);
801                 pos += sizeof(uint32_t);
802                 map->size = *(uint64_t *) (buf + pos);
803                 pos += sizeof(uint64_t);
804                 nr_objs = map->size / block_size;
805                 if (map->size % block_size)
806                         nr_objs++;
807                 map_node = calloc(nr_objs, sizeof(struct map_node));
808                 if (!map_node)
809                         return -1;
810
811                 for (i = 0; i < nr_objs; i++) {
812                         map_node[i].map = map;
813                         map_node[i].objectidx = i;
814                         map_node[i].waiters = 0;
815                         map_node[i].ref = 1;
816                         map_node[i].cond = st_cond_new(); //FIXME err check;
817                         map_to_object(&map_node[i], buf + pos);
818                         pos += objectsize_in_map;
819                         r = insert_object(map, &map_node[i]); //FIXME error check
820                 }
821         } else {
822                 pos = 0;
823                 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
824                 map_node = calloc(max_nr_objs, sizeof(struct map_node));
825                 if (!map_node)
826                         return -1;
827                 for (i = 0; i < max_nr_objs; i++) {
828                         if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
829                                 break;
830                         map_node[i].objectidx = i;
831                         map_node[i].map = map;
832                         map_node[i].waiters = 0;
833                         map_node[i].ref = 1;
834                         map_node[i].cond = st_cond_new(); //FIXME err check;
835                         pithosmap_to_object(&map_node[i], buf + pos);
836                         pos += SHA256_DIGEST_SIZE;
837                         r = insert_object(map, &map_node[i]); //FIXME error check
838                 }
839                 map->size = i * block_size;
840         }
841         print_map(map);
842         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
843         return 0;
844
845         //FIXME cleanup on error
846 }
847
848 static int load_map(struct peer_req *pr, struct map *map)
849 {
850         int r = 0;
851         struct xseg_request *req;
852         struct peerd *peer = pr->peer;
853         req = __load_map(pr, map);
854         if (!req)
855                 return -1;
856         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
857         map->flags &= ~MF_MAP_LOADING;
858         if (req->state & XS_FAILED){
859                 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
860                 xseg_put_request(peer->xseg, req, pr->portno);
861                 return -1;
862         }
863         r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
864         xseg_put_request(peer->xseg, req, pr->portno);
865         return r;
866 }
867
868 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
869                                                 uint32_t flags)
870 {
871         int r;
872         xport p;
873         struct xseg_request *req;
874         struct peerd *peer = pr->peer;
875         struct mapperd *mapper = __get_mapperd(peer);
876         void *dummy;
877
878         XSEGLOG2(&lc, I, "Opening map %s", m->volume);
879
880         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
881         if (!req){
882                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
883                                 m->volume);
884                 goto out_fail;
885         }
886
887         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
888         if (r < 0){
889                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
890                                 m->volume);
891                 goto out_put;
892         }
893
894         char *reqtarget = xseg_get_target(peer->xseg, req);
895         if (!reqtarget)
896                 goto out_put;
897         strncpy(reqtarget, m->volume, req->targetlen);
898         req->op = X_OPEN;
899         req->size = block_size;
900         req->offset = 0;
901         if (!(flags & MF_FORCE))
902                 req->flags = XF_NOSYNC;
903         r = xseg_set_req_data(peer->xseg, req, pr);
904         if (r < 0){
905                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
906                                 m->volume);
907                 goto out_put;
908         }
909         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
910         if (p == NoPort){ 
911                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
912                                 m->volume);
913                 goto out_unset;
914         }
915         r = xseg_signal(peer->xseg, p);
916
917         m->flags |= MF_MAP_OPENING;
918         XSEGLOG2(&lc, I, "Map %s opening", m->volume);
919         return req;
920
921 out_unset:
922         xseg_get_req_data(peer->xseg, req, &dummy);
923 out_put:
924         xseg_put_request(peer->xseg, req, pr->portno);
925 out_fail:
926         return NULL;
927 }
928
929 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
930 {
931         int err;
932         struct xseg_request *req;
933         struct peerd *peer = pr->peer;
934
935         req = __open_map(pr, map, flags);
936         if (!req){
937                 return -1;
938         }
939         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
940         map->flags &= ~MF_MAP_OPENING;
941         err = req->state & XS_FAILED;
942         xseg_put_request(peer->xseg, req, pr->portno);
943         if (err)
944                 return -1;
945         else 
946                 map->flags |= MF_MAP_EXCLUSIVE;
947         return 0;
948 }
949
950 /*
951  * copy up functions
952  */
953
954 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
955 {
956         int r = 0;
957         if (mn){
958                 XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
959                                 req, mn, mio);
960                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
961                 if (r == -XHASH_ERESIZE) {
962                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
963                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
964                         if (!new_hashmap)
965                                 goto out;
966                         mio->copyups_nodes = new_hashmap;
967                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
968                 }
969                 if (r < 0)
970                         XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
971                                         req, mn, mio);
972         }
973         else {
974                 XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
975                                 req, mio);
976                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
977                 if (r == -XHASH_ERESIZE) {
978                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
979                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
980                         if (!new_hashmap)
981                                 goto out;
982                         mio->copyups_nodes = new_hashmap;
983                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
984                 }
985                 if (r < 0)
986                         XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
987                                         req, mio);
988         }
989 out:
990         return r;
991 }
992
993 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
994 {
995         struct map_node *mn;
996         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
997         if (r < 0){
998                 XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
999                 return NULL;
1000         }
1001         XSEGLOG2(&lc, D, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
1002         return mn;
1003 }
1004
1005 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
1006 {
1007         struct mapperd *mapper = __get_mapperd(peer);
1008         struct mapper_io *mio = __get_mapper_io(pr);
1009         struct map *map = mn->map;
1010         void *dummy;
1011         int r = -1;
1012         xport p;
1013
1014         uint32_t newtargetlen;
1015         char new_target[MAX_OBJECT_LEN + 1];
1016         unsigned char sha[SHA256_DIGEST_SIZE];
1017
1018         strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1019
1020         char tmp[XSEG_MAX_TARGETLEN + 1];
1021         uint32_t tmplen;
1022         strncpy(tmp, map->volume, map->volumelen);
1023         sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1024         tmp[XSEG_MAX_TARGETLEN] = 0;
1025         tmplen = strlen(tmp);
1026         SHA256((unsigned char *)tmp, tmplen, sha);
1027         hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1028         newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1029
1030
1031         if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1032                 goto copyup_zeroblock;
1033
1034         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1035                                                 mapper->bportno, X_ALLOC);
1036         if (!req){
1037                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1038                 goto out_err;
1039         }
1040         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
1041                                 sizeof(struct xseg_request_copy));
1042         if (r < 0){
1043                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1044                 goto out_put;
1045         }
1046
1047         char *target = xseg_get_target(peer->xseg, req);
1048         strncpy(target, new_target, req->targetlen);
1049
1050         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1051         strncpy(xcopy->target, mn->object, mn->objectlen);
1052         xcopy->targetlen = mn->objectlen;
1053
1054         req->offset = 0;
1055         req->size = block_size;
1056         req->op = X_COPY;
1057         r = xseg_set_req_data(peer->xseg, req, pr);
1058         if (r<0){
1059                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1060                 goto out_put;
1061         }
1062         r = __set_copyup_node(mio, req, mn);
1063         if (r < 0)
1064                 goto out_unset;
1065         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1066         if (p == NoPort) {
1067                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1068                 goto out_mapper_unset;
1069         }
1070         xseg_signal(peer->xseg, p);
1071 //      mio->copyups++;
1072
1073         mn->flags |= MF_OBJECT_COPYING;
1074         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1075         return req;
1076
1077 out_mapper_unset:
1078         __set_copyup_node(mio, req, NULL);
1079 out_unset:
1080         xseg_get_req_data(peer->xseg, req, &dummy);
1081 out_put:
1082         xseg_put_request(peer->xseg, req, pr->portno);
1083 out_err:
1084         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1085         return NULL;
1086
1087 copyup_zeroblock:
1088         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1089                         "Proceeding in writing the new object in map");
1090         /* construct a tmp map_node for writing purposes */
1091         struct map_node newmn = *mn;
1092         newmn.flags = MF_OBJECT_EXIST;
1093         strncpy(newmn.object, new_target, newtargetlen);
1094         newmn.object[newtargetlen] = 0;
1095         newmn.objectlen = newtargetlen;
1096         newmn.objectidx = mn->objectidx; 
1097         req = object_write(peer, pr, map, &newmn);
1098         r = __set_copyup_node(mio, req, mn);
1099         if (r < 0)
1100                 return NULL;
1101         if (!req){
1102                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1103                                 "\n\t of map %s [%llu]",
1104                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
1105                 return NULL;
1106         }
1107         mn->flags |= MF_OBJECT_WRITING;
1108         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1109         return req;
1110 }
1111
1112 static struct xseg_request * __delete_object(struct peer_req *pr, struct map_node *mn)
1113 {
1114         void *dummy;
1115         struct peerd *peer = pr->peer;
1116         struct mapperd *mapper = __get_mapperd(peer);
1117         struct mapper_io *mio = __get_mapper_io(pr);
1118         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1119                                                         mapper->bportno, X_ALLOC);
1120         XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1121         if (!req){
1122                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1123                 goto out_err;
1124         }
1125         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1126         if (r < 0){
1127                 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1128                 goto out_put;
1129         }
1130         char *target = xseg_get_target(peer->xseg, req);
1131         strncpy(target, mn->object, req->targetlen);
1132         req->op = X_DELETE;
1133         req->size = req->datalen;
1134         req->offset = 0;
1135         r = xseg_set_req_data(peer->xseg, req, pr);
1136         if (r < 0){
1137                 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1138                 goto out_put;
1139         }
1140         r = __set_copyup_node(mio, req, mn);
1141         if (r < 0)
1142                 goto out_unset;
1143         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1144         if (p == NoPort){
1145                 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1146                 goto out_mapper_unset;
1147         }
1148         r = xseg_signal(peer->xseg, p);
1149         mn->flags |= MF_OBJECT_DELETING;
1150         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1151         return req;
1152
1153 out_mapper_unset:
1154         __set_copyup_node(mio, req, NULL);
1155 out_unset:
1156         xseg_get_req_data(peer->xseg, req, &dummy);
1157 out_put:
1158         xseg_put_request(peer->xseg, req, pr->portno);
1159 out_err:
1160         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1161         return NULL;
1162 }
1163
1164 static struct xseg_request * __delete_map(struct peer_req *pr, struct map *map)
1165 {
1166         void *dummy;
1167         struct peerd *peer = pr->peer;
1168         struct mapperd *mapper = __get_mapperd(peer);
1169         struct mapper_io *mio = __get_mapper_io(pr);
1170         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1171                                                         mapper->mbportno, X_ALLOC);
1172         XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1173         if (!req){
1174                 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1175                 goto out_err;
1176         }
1177         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1178         if (r < 0){
1179                 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1180                 goto out_put;
1181         }
1182         char *target = xseg_get_target(peer->xseg, req);
1183         strncpy(target, map->volume, req->targetlen);
1184         req->op = X_DELETE;
1185         req->size = req->datalen;
1186         req->offset = 0;
1187         r = xseg_set_req_data(peer->xseg, req, pr);
1188         if (r < 0){
1189                 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1190                 goto out_put;
1191         }
1192         /* do not check return value. just make sure there is no node set */
1193         __set_copyup_node(mio, req, NULL);
1194         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1195         if (p == NoPort){
1196                 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1197                 goto out_unset;
1198         }
1199         r = xseg_signal(peer->xseg, p);
1200         map->flags |= MF_MAP_DELETING;
1201         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1202         return req;
1203
1204 out_unset:
1205         xseg_get_req_data(peer->xseg, req, &dummy);
1206 out_put:
1207         xseg_put_request(peer->xseg, req, pr->portno);
1208 out_err:
1209         XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1210         return  NULL;
1211 }
1212
1213
1214 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1215 {
1216         struct map_node *mn = find_object(map, index);
1217         if (mn)
1218                 mn->ref++;
1219         return mn;
1220 }
1221
1222 static inline void put_mapnode(struct map_node *mn)
1223 {
1224         mn->ref--;
1225         if (!mn->ref){
1226                 //clean up mn
1227                 st_cond_destroy(mn->cond);
1228         }
1229 }
1230
1231 static inline void __get_map(struct map *map)
1232 {
1233         map->ref++;
1234 }
1235
1236 static inline void put_map(struct map *map)
1237 {
1238         struct map_node *mn;
1239         map->ref--;
1240         if (!map->ref){
1241                 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1242                 //clean up map
1243                 uint64_t i;
1244                 for (i = 0; i < calc_map_obj(map); i++) {
1245                         mn = get_mapnode(map, i);
1246                         if (mn) {
1247                                 //make sure all pending operations on all objects are completed
1248                                 //this should never happen...
1249                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1250                                 mn->flags |= MF_OBJECT_DESTROYED;
1251                                 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1252                                 put_mapnode(mn); //matcing get_mapnode;
1253                                 //assert mn->ref == 0;
1254                         }
1255                 }
1256                 mn = find_object(map, 0);
1257                 if (mn)
1258                         free(mn);
1259                 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1260                 free(map);
1261         }
1262 }
1263
1264 static struct map * create_map(struct mapperd *mapper, char *name,
1265                                 uint32_t namelen, uint32_t flags)
1266 {
1267         int r;
1268         if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1269                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1270                                         namelen, MAX_VOLUME_LEN);
1271                 return NULL;
1272         }
1273         struct map *m = malloc(sizeof(struct map));
1274         if (!m){
1275                 XSEGLOG2(&lc, E, "Cannot allocate map ");
1276                 goto out_err;
1277         }
1278         m->size = -1;
1279         if (flags & MF_ARCHIP){
1280                 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1281                 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1282                 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1283                 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1284                 m->version = 1; /* keep this hardcoded for now */
1285         }
1286         else {
1287                 strncpy(m->volume, name, namelen);
1288                 m->volume[namelen] = 0;
1289                 m->volumelen = namelen;
1290                 m->version = 0; /* version 0 should be pithos maps */
1291         }
1292         m->flags = 0;
1293         m->objects = xhash_new(3, INTEGER); 
1294         if (!m->objects){
1295                 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1296                                 m->volume);
1297                 goto out_map;
1298         }
1299         m->ref = 1;
1300         m->waiters = 0;
1301         m->cond = st_cond_new(); //FIXME err check;
1302         r = insert_map(mapper, m);
1303         if (r < 0){
1304                 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1305                 goto out_hash;
1306         }
1307
1308         return m;
1309
1310 out_hash:
1311         xhash_free(m->objects);
1312 out_map:
1313         XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1314         free(m);
1315 out_err:
1316         return NULL;
1317 }
1318
1319
1320
1321 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1322 {
1323         struct peerd *peer = pr->peer;
1324         struct mapperd *mapper = __get_mapperd(peer);
1325         (void)mapper;
1326         struct mapper_io *mio = __get_mapper_io(pr);
1327         struct map_node *mn = __get_copyup_node(mio, req);
1328
1329         __set_copyup_node(mio, req, NULL);
1330
1331         //assert req->op = X_DELETE;
1332         //assert pr->req->op = X_DELETE only map deletions make delete requests
1333         //assert mio->del_pending > 0
1334         XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
1335         mio->del_pending--;
1336
1337         if (req->state & XS_FAILED){
1338                 mio->err = 1;
1339         }
1340         if (mn){
1341                 XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
1342                                 mn, mn->object, mio, req);
1343                 // assert mn->flags & MF_OBJECT_DELETING
1344                 mn->flags &= ~MF_OBJECT_DELETING;
1345                 mn->flags |= MF_OBJECT_DESTROYED;
1346                 signal_mapnode(mn);
1347                 /* put mapnode here, matches get_mapnode on do_destroy */
1348                 put_mapnode(mn);
1349         } else {
1350                 XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
1351                                 mio, req);
1352         }
1353         xseg_put_request(peer->xseg, req, pr->portno);
1354         signal_pr(pr);
1355 }
1356
1357 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1358 {
1359         struct peerd *peer = pr->peer;
1360         struct mapperd *mapper = __get_mapperd(peer);
1361         (void)mapper;
1362         struct mapper_io *mio = __get_mapper_io(pr);
1363         struct map_node *mn = __get_copyup_node(mio, req);
1364         if (!mn){
1365                 XSEGLOG2(&lc, E, "Cannot get map node");
1366                 goto out_err;
1367         }
1368         __set_copyup_node(mio, req, NULL);
1369
1370         if (req->state & XS_FAILED){
1371                 XSEGLOG2(&lc, E, "Req failed");
1372                 mn->flags &= ~MF_OBJECT_COPYING;
1373                 mn->flags &= ~MF_OBJECT_WRITING;
1374                 goto out_err;
1375         }
1376         if (req->op == X_WRITE) {
1377                 char *target = xseg_get_target(peer->xseg, req);
1378                 (void)target;
1379                 //printf("handle object write replyi\n");
1380                 __set_copyup_node(mio, req, NULL);
1381                 //assert mn->flags & MF_OBJECT_WRITING
1382                 mn->flags &= ~MF_OBJECT_WRITING;
1383
1384                 struct map_node tmp;
1385                 char *data = xseg_get_data(peer->xseg, req);
1386                 map_to_object(&tmp, (unsigned char *) data);
1387                 mn->flags |= MF_OBJECT_EXIST;
1388                 if (mn->flags != MF_OBJECT_EXIST){
1389                         XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1390                         goto out_err;
1391                 }
1392                 //assert mn->flags & MF_OBJECT_EXIST
1393                 strncpy(mn->object, tmp.object, tmp.objectlen);
1394                 mn->object[tmp.objectlen] = 0;
1395                 mn->objectlen = tmp.objectlen;
1396                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1397                 mio->copyups--;
1398                 signal_mapnode(mn);
1399                 signal_pr(pr);
1400         } else if (req->op == X_COPY) {
1401         //      issue write_object;
1402                 mn->flags &= ~MF_OBJECT_COPYING;
1403                 struct map *map = mn->map;
1404                 if (!map){
1405                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1406                         goto out_err;
1407                 }
1408
1409                 /* construct a tmp map_node for writing purposes */
1410                 char *target = xseg_get_target(peer->xseg, req);
1411                 struct map_node newmn = *mn;
1412                 newmn.flags = MF_OBJECT_EXIST;
1413                 strncpy(newmn.object, target, req->targetlen);
1414                 newmn.object[req->targetlen] = 0;
1415                 newmn.objectlen = req->targetlen;
1416                 newmn.objectidx = mn->objectidx; 
1417                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1418                 if (!xreq){
1419                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1420                                         "\n\t of map %s [%llu]",
1421                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1422                         goto out_err;
1423                 }
1424                 mn->flags |= MF_OBJECT_WRITING;
1425                 __set_copyup_node (mio, xreq, mn);
1426
1427                 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1428         } else {
1429                 //wtf??
1430                 ;
1431         }
1432
1433 out:
1434         xseg_put_request(peer->xseg, req, pr->portno);
1435         return;
1436
1437 out_err:
1438         mio->copyups--;
1439         XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1440         mio->err = 1;
1441         if (mn)
1442                 signal_mapnode(mn);
1443         signal_pr(pr);
1444         goto out;
1445
1446 }
1447
1448 struct r2o {
1449         struct map_node *mn;
1450         uint64_t offset;
1451         uint64_t size;
1452 };
1453
1454 static int req2objs(struct peer_req *pr, struct map *map, int write)
1455 {
1456         int r = 0;
1457         struct peerd *peer = pr->peer;
1458         struct mapper_io *mio = __get_mapper_io(pr);
1459         char *target = xseg_get_target(peer->xseg, pr->req);
1460         uint32_t nr_objs = calc_nr_obj(pr->req);
1461         uint64_t size = sizeof(struct xseg_reply_map) + 
1462                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1463         uint32_t idx, i;
1464         uint64_t rem_size, obj_index, obj_offset, obj_size; 
1465         struct map_node *mn;
1466         mio->copyups = 0;
1467         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1468
1469         /* get map_nodes of request */
1470         struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1471         if (!mns){
1472                 XSEGLOG2(&lc, E, "Cannot allocate mns");
1473                 return -1;
1474         }
1475         idx = 0;
1476         rem_size = pr->req->size;
1477         obj_index = pr->req->offset / block_size;
1478         obj_offset = pr->req->offset & (block_size -1); //modulo
1479         obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1480         mn = get_mapnode(map, obj_index);
1481         if (!mn) {
1482                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1483                 r = -1;
1484                 goto out;
1485         }
1486         mns[idx].mn = mn;
1487         mns[idx].offset = obj_offset;
1488         mns[idx].size = obj_size;
1489         rem_size -= obj_size;
1490         while (rem_size > 0) {
1491                 idx++;
1492                 obj_index++;
1493                 obj_offset = 0;
1494                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
1495                 rem_size -= obj_size;
1496                 mn = get_mapnode(map, obj_index);
1497                 if (!mn) {
1498                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1499                         r = -1;
1500                         goto out;
1501                 }
1502                 mns[idx].mn = mn;
1503                 mns[idx].offset = obj_offset;
1504                 mns[idx].size = obj_size;
1505         }
1506         if (write) {
1507                 int can_wait = 0;
1508                 mio->cb=copyup_cb;
1509                 /* do a first scan and issue as many copyups as we can.
1510                  * then retry and wait when an object is not ready.
1511                  * this could be done better, since now we wait also on the
1512                  * pending copyups
1513                  */
1514                 int j;
1515                 for (j = 0; j < 2 && !mio->err; j++) {
1516                         for (i = 0; i < (idx+1); i++) {
1517                                 mn = mns[i].mn;
1518                                 //do copyups
1519                                 if (mn->flags & MF_OBJECT_NOT_READY){
1520                                         if (!can_wait)
1521                                                 continue;
1522                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1523                                         if (mn->flags & MF_OBJECT_DESTROYED){
1524                                                 mio->err = 1;
1525                                                 continue;
1526                                         }
1527                                 }
1528
1529                                 if (!(mn->flags & MF_OBJECT_EXIST)) {
1530                                         //calc new_target, copy up object
1531                                         if (copyup_object(peer, mn, pr) == NULL){
1532                                                 XSEGLOG2(&lc, E, "Error in copy up object");
1533                                                 mio->err = 1;
1534                                         } else {
1535                                                 mio->copyups++;
1536                                         }
1537                                 }
1538
1539                                 if (mio->err){
1540                                         XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1541                                         break;
1542                                 }
1543                         }
1544                         can_wait = 1;
1545                 }
1546                 wait_on_pr(pr, mio->copyups > 0);
1547         }
1548
1549         if (mio->err){
1550                 r = -1;
1551                 XSEGLOG2(&lc, E, "Mio->err");
1552                 goto out;
1553         }
1554
1555         /* resize request to fit reply */
1556         char buf[XSEG_MAX_TARGETLEN];
1557         strncpy(buf, target, pr->req->targetlen);
1558         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1559         if (r < 0) {
1560                 XSEGLOG2(&lc, E, "Cannot resize request");
1561                 goto out;
1562         }
1563         target = xseg_get_target(peer->xseg, pr->req);
1564         strncpy(target, buf, pr->req->targetlen);
1565
1566         /* structure reply */
1567         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1568         reply->cnt = nr_objs;
1569         for (i = 0; i < (idx+1); i++) {
1570                 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1571                 reply->segs[i].targetlen = mns[i].mn->objectlen;
1572                 reply->segs[i].offset = mns[i].offset;
1573                 reply->segs[i].size = mns[i].size;
1574         }
1575 out:
1576         for (i = 0; i < idx; i++) {
1577                 put_mapnode(mns[i].mn);
1578         }
1579         free(mns);
1580         mio->cb = NULL;
1581         return r;
1582 }
1583
1584 static int do_dropcache(struct peer_req *pr, struct map *map)
1585 {
1586         struct map_node *mn;
1587         struct peerd *peer = pr->peer;
1588         struct mapperd *mapper = __get_mapperd(peer);
1589         uint64_t i;
1590         XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1591         map->flags |= MF_MAP_DROPPING_CACHE;
1592         for (i = 0; i < calc_map_obj(map); i++) {
1593                 mn = get_mapnode(map, i);
1594                 if (mn) {
1595                         if (!(mn->flags & MF_OBJECT_DESTROYED)){
1596                                 //make sure all pending operations on all objects are completed
1597                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1598                                 mn->flags |= MF_OBJECT_DESTROYED;
1599                         }
1600                         put_mapnode(mn);
1601                 }
1602         }
1603         map->flags &= ~MF_MAP_DROPPING_CACHE;
1604         map->flags |= MF_MAP_DESTROYED;
1605         remove_map(mapper, map);
1606         XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1607         put_map(map);   // put map here to destroy it (matches m->ref = 1 on map create)
1608         return 0;
1609 }
1610
1611 static int do_info(struct peer_req *pr, struct map *map)
1612 {
1613         struct peerd *peer = pr->peer;
1614         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1615         xinfo->size = map->size;
1616         return 0;
1617 }
1618
1619
1620 static int do_close(struct peer_req *pr, struct map *map)
1621 {
1622         if (map->flags & MF_MAP_EXCLUSIVE){
1623                 /* do not drop cache if close failed and map not deleted */
1624                 if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
1625                         return -1;
1626         }
1627         return do_dropcache(pr, map);
1628 }
1629
1630 static int do_destroy(struct peer_req *pr, struct map *map)
1631 {
1632         uint64_t i;
1633         struct peerd *peer = pr->peer;
1634         struct mapper_io *mio = __get_mapper_io(pr);
1635         struct map_node *mn;
1636         struct xseg_request *req;
1637
1638         if (!(map->flags & MF_MAP_EXCLUSIVE))
1639                 return -1;
1640
1641         XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1642         req = __delete_map(pr, map);
1643         if (!req)
1644                 return -1;
1645         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1646         if (req->state & XS_FAILED){
1647                 xseg_put_request(peer->xseg, req, pr->portno);
1648                 map->flags &= ~MF_MAP_DELETING;
1649                 return -1;
1650         }
1651         xseg_put_request(peer->xseg, req, pr->portno);
1652
1653         uint64_t nr_obj = calc_map_obj(map);
1654         mio->cb = deletion_cb;
1655         mio->del_pending = 0;
1656         mio->err = 0;
1657         for (i = 0; i < nr_obj; i++){
1658
1659                 /* throttle pending deletions
1660                  * this should be nr_ops of the blocker, but since we don't know
1661                  * that, we assume based on our own nr_ops
1662                  */
1663                 wait_on_pr(pr, mio->del_pending >= peer->nr_ops);
1664
1665                 mn = get_mapnode(map, i);
1666                 if (!mn)
1667                         continue;
1668                 if (mn->flags & MF_OBJECT_DESTROYED){
1669                         put_mapnode(mn);
1670                         continue;
1671                 }
1672                 if (!(mn->flags & MF_OBJECT_EXIST)){
1673                         mn->flags |= MF_OBJECT_DESTROYED;
1674                         put_mapnode(mn);
1675                         continue;
1676                 }
1677
1678                 // make sure all pending operations on all objects are completed
1679                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1680
1681                 req = __delete_object(pr, mn);
1682                 if (!req){
1683                         mio->err = 1;
1684                         put_mapnode(mn);
1685                         continue;
1686                 }
1687                 mio->del_pending++;
1688                 /* do not put_mapnode here. cb does that */
1689         }
1690
1691         wait_on_pr(pr, mio->del_pending > 0);
1692
1693         mio->cb = NULL;
1694         map->flags &= ~MF_MAP_DELETING;
1695         map->flags |= MF_MAP_DELETED;
1696         XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1697         return do_close(pr, map);
1698 }
1699
1700 static int do_mapr(struct peer_req *pr, struct map *map)
1701 {
1702         struct peerd *peer = pr->peer;
1703         int r = req2objs(pr, map, 0);
1704         if  (r < 0){
1705                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1706                                 map->volume, 
1707                                 (unsigned long long) pr->req->offset, 
1708                                 (unsigned long long) (pr->req->offset + pr->req->size));
1709                 return -1;
1710         }
1711         XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1712                         map->volume, 
1713                         (unsigned long long) pr->req->offset, 
1714                         (unsigned long long) (pr->req->offset + pr->req->size));
1715         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1716                         (unsigned long long) pr->req->offset,
1717                         (unsigned long long) pr->req->size);
1718         char buf[XSEG_MAX_TARGETLEN+1];
1719         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1720         int i;
1721         for (i = 0; i < reply->cnt; i++) {
1722                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1723                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1724                 buf[reply->segs[i].targetlen] = 0;
1725                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1726                                 (unsigned long long) reply->segs[i].offset,
1727                                 (unsigned long long) reply->segs[i].size);
1728         }
1729         return 0;
1730 }
1731
1732 static int do_mapw(struct peer_req *pr, struct map *map)
1733 {
1734         struct peerd *peer = pr->peer;
1735         int r = req2objs(pr, map, 1);
1736         if  (r < 0){
1737                 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1738                                 map->volume, 
1739                                 (unsigned long long) pr->req->offset, 
1740                                 (unsigned long long) (pr->req->offset + pr->req->size));
1741                 return -1;
1742         }
1743         XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1744                         map->volume, 
1745                         (unsigned long long) pr->req->offset, 
1746                         (unsigned long long) (pr->req->offset + pr->req->size));
1747         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1748                         (unsigned long long) pr->req->offset,
1749                         (unsigned long long) pr->req->size);
1750         char buf[XSEG_MAX_TARGETLEN+1];
1751         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1752         int i;
1753         for (i = 0; i < reply->cnt; i++) {
1754                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1755                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1756                 buf[reply->segs[i].targetlen] = 0;
1757                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1758                                 (unsigned long long) reply->segs[i].offset,
1759                                 (unsigned long long) reply->segs[i].size);
1760         }
1761         return 0;
1762 }
1763
1764 //here map is the parent map
1765 static int do_clone(struct peer_req *pr, struct map *map)
1766 {
1767         int r;
1768         char buf[XSEG_MAX_TARGETLEN];
1769         struct peerd *peer = pr->peer;
1770         struct mapperd *mapper = __get_mapperd(peer);
1771         char *target = xseg_get_target(peer->xseg, pr->req);
1772         struct map *clonemap;
1773         struct xseg_request_clone *xclone =
1774                 (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1775
1776         XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1777
1778         clonemap = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
1779         if (!clonemap)
1780                 return -1;
1781
1782         /* open map to get exclusive access to map */
1783         r = open_map(pr, clonemap, 0);
1784         if (r < 0){
1785                 XSEGLOG2(&lc, E, "Cannot open map %s", clonemap->volume);
1786                 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
1787                 goto out_err;
1788         }
1789         r = load_map(pr, clonemap);
1790         if (r >= 0) {
1791                 XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
1792                 goto out_err;
1793         }
1794
1795         if (xclone->size == -1)
1796                 clonemap->size = map->size;
1797         else
1798                 clonemap->size = xclone->size;
1799         if (clonemap->size < map->size){
1800                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1801                                 "\n\t for requested clone %s",
1802                                 (unsigned long long) xclone->size,
1803                                 (unsigned long long) map->size, clonemap->volume);
1804                 goto out_err;
1805         }
1806         if (clonemap->size > MAX_VOLUME_SIZE) {
1807                 XSEGLOG2(&lc, E, "Requested size %llu > max volume size %llu"
1808                                 "\n\t for volume %s",
1809                                 clonemap->size, MAX_VOLUME_SIZE, clonemap->volume);
1810                 goto out_err;
1811         }
1812
1813         //alloc and init map_nodes
1814         unsigned long c = clonemap->size/block_size + 1;
1815         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1816         if (!map_nodes){
1817                 goto out_err;
1818         }
1819         int i;
1820         for (i = 0; i < clonemap->size/block_size + 1; i++) {
1821                 struct map_node *mn = get_mapnode(map, i);
1822                 if (mn) {
1823                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1824                         map_nodes[i].objectlen = mn->objectlen;
1825                         put_mapnode(mn);
1826                 } else {
1827                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1828                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1829                 }
1830                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1831                 map_nodes[i].flags = 0;
1832                 map_nodes[i].objectidx = i;
1833                 map_nodes[i].map = clonemap;
1834                 map_nodes[i].ref = 1;
1835                 map_nodes[i].waiters = 0;
1836                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1837                 r = insert_object(clonemap, &map_nodes[i]);
1838                 if (r < 0){
1839                         XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1840                         goto out_err;
1841                 }
1842         }
1843
1844         r = write_map(pr, clonemap);
1845         if (r < 0){
1846                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1847                 goto out_err;
1848         }
1849         do_close(pr, clonemap);
1850         return 0;
1851
1852 out_err:
1853         do_close(pr, clonemap);
1854         return -1;
1855 }
1856
1857 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1858 {
1859         int r, opened = 0;
1860         if (flags & MF_EXCLUSIVE){
1861                 r = open_map(pr, map, flags);
1862                 if (r < 0) {
1863                         if (flags & MF_FORCE){
1864                                 return -1;
1865                         }
1866                 } else {
1867                         opened = 1;
1868                 }
1869         }
1870         r = load_map(pr, map);
1871         if (r < 0 && opened){
1872                 close_map(pr, map);
1873         }
1874         return r;
1875 }
1876
1877 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
1878                         uint32_t flags)
1879 {
1880         int r;
1881         struct peerd *peer = pr->peer;
1882         struct mapperd *mapper = __get_mapperd(peer);
1883         struct map *map = find_map_len(mapper, name, namelen, flags);
1884         if (!map){
1885                 if (flags & MF_LOAD){
1886                         map = create_map(mapper, name, namelen, flags);
1887                         if (!map)
1888                                 return NULL;
1889                         r = open_load_map(pr, map, flags);
1890                         if (r < 0){
1891                                 do_dropcache(pr, map);
1892                                 return NULL;
1893                         }
1894                 } else {
1895                         return NULL;
1896                 }
1897         } else if (map->flags & MF_MAP_DESTROYED){
1898                 return NULL;
1899         }
1900         __get_map(map);
1901         return map;
1902
1903 }
1904
1905 static int map_action(int (action)(struct peer_req *pr, struct map *map),
1906                 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1907 {
1908         //struct peerd *peer = pr->peer;
1909         struct map *map;
1910 start:
1911         map = get_map(pr, name, namelen, flags);
1912         if (!map)
1913                 return -1;
1914         if (map->flags & MF_MAP_NOT_READY){
1915                 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1916                 put_map(map);
1917                 goto start;
1918         }
1919         int r = action(pr, map);
1920         //always drop cache if map not read exclusively
1921         if (!(map->flags & MF_MAP_EXCLUSIVE))
1922                 do_dropcache(pr, map);
1923         signal_map(map);
1924         put_map(map);
1925         return r;
1926 }
1927
1928 void * handle_info(struct peer_req *pr)
1929 {
1930         struct peerd *peer = pr->peer;
1931         char *target = xseg_get_target(peer->xseg, pr->req);
1932         int r = map_action(do_info, pr, target, pr->req->targetlen,
1933                                 MF_ARCHIP|MF_LOAD);
1934         if (r < 0)
1935                 fail(peer, pr);
1936         else
1937                 complete(peer, pr);
1938         ta--;
1939         return NULL;
1940 }
1941
1942 void * handle_clone(struct peer_req *pr)
1943 {
1944         int r;
1945         struct peerd *peer = pr->peer;
1946         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1947         if (!xclone) {
1948                 r = -1;
1949                 goto out;
1950         }
1951
1952         if (xclone->targetlen){
1953                 /* if snap was defined */
1954                 //support clone only from pithos
1955                 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
1956                                         MF_LOAD);
1957         } else {
1958                 /* else try to create a new volume */
1959                 XSEGLOG2(&lc, I, "Creating volume");
1960                 if (!xclone->size){
1961                         XSEGLOG2(&lc, E, "Cannot create volume. Size not specified");
1962                         r = -1;
1963                         goto out;
1964                 }
1965                 if (xclone->size > MAX_VOLUME_SIZE) {
1966                         XSEGLOG2(&lc, E, "Requested size %llu > max volume "
1967                                         "size %llu", xclone->size, MAX_VOLUME_SIZE);
1968                         r = -1;
1969                         goto out;
1970                 }
1971
1972                 struct map *map;
1973                 char *target = xseg_get_target(peer->xseg, pr->req);
1974
1975                 //create a new empty map of size
1976                 map = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
1977                 if (!map){
1978                         r = -1;
1979                         goto out;
1980                 }
1981                 /* open map to get exclusive access to map */
1982                 r = open_map(pr, map, 0);
1983                 if (r < 0){
1984                         XSEGLOG2(&lc, E, "Cannot open map %s", map->volume);
1985                         XSEGLOG2(&lc, E, "Target volume %s exists", map->volume);
1986                         do_dropcache(pr, map);
1987                         r = -1;
1988                         goto out;
1989                 }
1990                 r = load_map(pr, map);
1991                 if (r >= 0) {
1992                         XSEGLOG2(&lc, E, "Map exists %s", map->volume);
1993                         do_close(pr, map);
1994                         r = -1;
1995                         goto out;
1996                 }
1997                 map->size = xclone->size;
1998                 //populate_map with zero objects;
1999                 uint64_t nr_objs = xclone->size / block_size;
2000                 if (xclone->size % block_size)
2001                         nr_objs++;
2002
2003                 struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
2004                 if (!map_nodes){
2005                         do_close(pr, map);
2006                         r = -1;
2007                         goto out;
2008                 }
2009
2010                 uint64_t i;
2011                 for (i = 0; i < nr_objs; i++) {
2012                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2013                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2014                         map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2015                         map_nodes[i].flags = 0;
2016                         map_nodes[i].objectidx = i;
2017                         map_nodes[i].map = map;
2018                         map_nodes[i].ref = 1;
2019                         map_nodes[i].waiters = 0;
2020                         map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2021                         r = insert_object(map, &map_nodes[i]);
2022                         if (r < 0){
2023                                 do_close(pr, map);
2024                                 r = -1;
2025                                 goto out;
2026                         }
2027                 }
2028                 r = write_map(pr, map);
2029                 if (r < 0){
2030                         XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
2031                         do_close(pr, map);
2032                         goto out;
2033                 }
2034                 XSEGLOG2(&lc, I, "Volume %s created", map->volume);
2035                 r = 0;
2036                 do_close(pr, map); //drop cache here for consistency
2037         }
2038 out:
2039         if (r < 0)
2040                 fail(peer, pr);
2041         else
2042                 complete(peer, pr);
2043         ta--;
2044         return NULL;
2045 }
2046
2047 void * handle_mapr(struct peer_req *pr)
2048 {
2049         struct peerd *peer = pr->peer;
2050         char *target = xseg_get_target(peer->xseg, pr->req);
2051         int r = map_action(do_mapr, pr, target, pr->req->targetlen,
2052                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2053         if (r < 0)
2054                 fail(peer, pr);
2055         else
2056                 complete(peer, pr);
2057         ta--;
2058         return NULL;
2059 }
2060
2061 void * handle_mapw(struct peer_req *pr)
2062 {
2063         struct peerd *peer = pr->peer;
2064         char *target = xseg_get_target(peer->xseg, pr->req);
2065         int r = map_action(do_mapw, pr, target, pr->req->targetlen,
2066                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2067         if (r < 0)
2068                 fail(peer, pr);
2069         else
2070                 complete(peer, pr);
2071         XSEGLOG2(&lc, D, "Ta: %d", ta);
2072         ta--;
2073         return NULL;
2074 }
2075
2076 void * handle_destroy(struct peer_req *pr)
2077 {
2078         struct peerd *peer = pr->peer;
2079         char *target = xseg_get_target(peer->xseg, pr->req);
2080         /* request EXCLUSIVE access, but do not force it.
2081          * check if succeeded on do_destroy
2082          */
2083         int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2084                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2085         if (r < 0)
2086                 fail(peer, pr);
2087         else
2088                 complete(peer, pr);
2089         ta--;
2090         return NULL;
2091 }
2092
2093 void * handle_close(struct peer_req *pr)
2094 {
2095         struct peerd *peer = pr->peer;
2096         char *target = xseg_get_target(peer->xseg, pr->req);
2097         //here we do not want to load
2098         int r = map_action(do_close, pr, target, pr->req->targetlen,
2099                                 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2100         if (r < 0)
2101                 fail(peer, pr);
2102         else
2103                 complete(peer, pr);
2104         ta--;
2105         return NULL;
2106 }
2107
2108 int dispatch_accepted(struct peerd *peer, struct peer_req *pr, 
2109                         struct xseg_request *req)
2110 {
2111         //struct mapperd *mapper = __get_mapperd(peer);
2112         struct mapper_io *mio = __get_mapper_io(pr);
2113         void *(*action)(struct peer_req *) = NULL;
2114
2115         mio->state = ACCEPTED;
2116         mio->err = 0;
2117         mio->cb = NULL;
2118         switch (pr->req->op) {
2119                 /* primary xseg operations of mapper */
2120                 case X_CLONE: action = handle_clone; break;
2121                 case X_MAPR: action = handle_mapr; break;
2122                 case X_MAPW: action = handle_mapw; break;
2123 //              case X_SNAPSHOT: handle_snap(peer, pr, req); break;
2124                 case X_INFO: action = handle_info; break;
2125                 case X_DELETE: action = handle_destroy; break;
2126                 case X_CLOSE: action = handle_close; break;
2127                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2128         }
2129         if (action){
2130                 ta++;
2131                 mio->active = 1;
2132                 st_thread_create(action, pr, 0, 0);
2133         }
2134         return 0;
2135
2136 }
2137
2138 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2139                 enum dispatch_reason reason)
2140 {
2141         struct mapperd *mapper = __get_mapperd(peer);
2142         (void) mapper;
2143         struct mapper_io *mio = __get_mapper_io(pr);
2144         (void) mio;
2145
2146
2147         if (reason == dispatch_accept)
2148                 dispatch_accepted(peer, pr, req);
2149         else {
2150                 if (mio->cb){
2151                         mio->cb(pr, req);
2152                 } else { 
2153                         signal_pr(pr);
2154                 }
2155         }
2156         return 0;
2157 }
2158
2159 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2160 {
2161         int i;
2162
2163         //FIXME error checks
2164         struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2165         peer->priv = mapperd;
2166         mapper = mapperd;
2167         mapper->hashmaps = xhash_new(3, STRING);
2168
2169         for (i = 0; i < peer->nr_ops; i++) {
2170                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2171                 mio->copyups_nodes = xhash_new(3, INTEGER);
2172                 mio->copyups = 0;
2173                 mio->err = 0;
2174                 mio->active = 0;
2175                 peer->peer_reqs[i].priv = mio;
2176         }
2177
2178         for (i = 0; i < argc; i++) {
2179                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
2180                         mapper->bportno = atoi(argv[i+1]);
2181                         i += 1;
2182                         continue;
2183                 }
2184                 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
2185                         mapper->mbportno = atoi(argv[i+1]);
2186                         i += 1;
2187                         continue;
2188                 }
2189                 /* enforce only one thread */
2190                 if (!strcmp(argv[i], "-t") && (i+1) < argc){
2191                         int t = atoi(argv[i+1]);
2192                         if (t != 1) {
2193                                 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
2194                                 return -1;
2195                         }
2196                         i += 1;
2197                         continue;
2198                 }
2199         }
2200
2201         const struct sched_param param = { .sched_priority = 99 };
2202         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
2203         /* FIXME maybe place it in peer
2204          * should be done for each port (sportno to eportno)
2205          */
2206         xseg_set_max_requests(peer->xseg, peer->portno_start, 5000);
2207         xseg_set_freequeue_size(peer->xseg, peer->portno_start, 3000, 0);
2208
2209
2210 //      test_map(peer);
2211
2212         return 0;
2213 }
2214
2215 /* FIXME this should not be here */
2216 int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
2217 {
2218         struct xseg *xseg = peer->xseg;
2219         xport portno_start = peer->portno_start;
2220         xport portno_end = peer->portno_end;
2221         struct peer_req *pr;
2222         xport i;
2223         int  r, c = 0;
2224         struct xseg_request *req, *received;
2225         xseg_prepare_wait(xseg, portno_start);
2226         while(1) {
2227                 XSEGLOG2(&lc, D, "Attempting to check for reply");
2228                 c = 1;
2229                 while (c){
2230                         c = 0;
2231                         for (i = portno_start; i <= portno_end; i++) {
2232                                 received = xseg_receive(xseg, i, 0);
2233                                 if (received) {
2234                                         c = 1;
2235                                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
2236                                         if (r < 0 || !pr || received != expected_req){
2237                                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
2238                                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
2239                                                 if (p == NoPort){
2240                                                         XSEGLOG2(&lc, W, "Could not respond stale request");
2241                                                         xseg_put_request(xseg, received, portno_start);
2242                                                         continue;
2243                                                 } else {
2244                                                         xseg_signal(xseg, p);
2245                                                 }
2246                                         } else {
2247                                                 xseg_cancel_wait(xseg, portno_start);
2248                                                 return 0;
2249                                         }
2250                                 }
2251                         }
2252                 }
2253                 xseg_wait_signal(xseg, 1000000UL);
2254         }
2255 }
2256
2257
2258 void custom_peer_finalize(struct peerd *peer)
2259 {
2260         struct mapperd *mapper = __get_mapperd(peer);
2261         struct peer_req *pr = alloc_peer_req(peer);
2262         if (!pr){
2263                 XSEGLOG2(&lc, E, "Cannot get peer request");
2264                 return;
2265         }
2266         int r;
2267         struct map *map;
2268         struct xseg_request *req;
2269         xhash_iter_t it;
2270         xhashidx key, val;
2271         xhash_iter_init(mapper->hashmaps, &it);
2272         while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
2273                 map = (struct map *)val;
2274                 if (!(map->flags & MF_MAP_EXCLUSIVE))
2275                         continue;
2276                 req = __close_map(pr, map);
2277                 if (!req)
2278                         continue;
2279                 wait_reply(peer, req);
2280                 if (!(req->state & XS_SERVED))
2281                         XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
2282                 map->flags &= ~MF_MAP_CLOSING;
2283                 xseg_put_request(peer->xseg, req, pr->portno);
2284         }
2285         return;
2286
2287
2288 }
2289
2290 void print_obj(struct map_node *mn)
2291 {
2292         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2293                         (unsigned long long) mn->objectidx, mn->object, 
2294                         (unsigned int) mn->objectlen, 
2295                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2296 }
2297
2298 void print_map(struct map *m)
2299 {
2300         uint64_t nr_objs = m->size/block_size;
2301         if (m->size % block_size)
2302                 nr_objs++;
2303         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n", 
2304                         m->volume, m->volumelen, 
2305                         (unsigned long long) m->size, 
2306                         (unsigned long long) nr_objs,
2307                         m->version);
2308         uint64_t i;
2309         struct map_node *mn;
2310         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2311                 return;
2312         for (i = 0; i < nr_objs; i++) {
2313                 mn = find_object(m, i);
2314                 if (!mn){
2315                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
2316                         continue;
2317                 }
2318                 print_obj(mn);
2319         }
2320 }
2321
2322 /*
2323 void test_map(struct peerd *peer)
2324 {
2325         int i,j, ret;
2326         //struct sha256_ctx sha256ctx;
2327         unsigned char buf[SHA256_DIGEST_SIZE];
2328         char buf_new[XSEG_MAX_TARGETLEN + 20];
2329         struct map *m = malloc(sizeof(struct map));
2330         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2331         m->volume[XSEG_MAX_TARGETLEN] = 0;
2332         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2333         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2334         m->volumelen = XSEG_MAX_TARGETLEN;
2335         m->size = 100*block_size;
2336         m->objects = xhash_new(3, INTEGER);
2337         struct map_node *map_node = calloc(100, sizeof(struct map_node));
2338         for (i = 0; i < 100; i++) {
2339                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2340                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2341                 
2342                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2343                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2344                 }
2345                 map_node[i].objectidx = i;
2346                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2347                 map_node[i].flags = MF_OBJECT_EXIST;
2348                 ret = insert_object(m, &map_node[i]);
2349         }
2350
2351         char *data = malloc(block_size);
2352         mapheader_to_map(m, data);
2353         uint64_t pos = mapheader_size;
2354
2355         for (i = 0; i < 100; i++) {
2356                 map_node = find_object(m, i);
2357                 if (!map_node){
2358                         printf("no object node %d \n", i);
2359                         exit(1);
2360                 }
2361                 object_to_map(data+pos, map_node);
2362                 pos += objectsize_in_map;
2363         }
2364 //      print_map(m);
2365
2366         struct map *m2 = malloc(sizeof(struct map));
2367         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2368         m->volume[XSEG_MAX_TARGETLEN] = 0;
2369         m->volumelen = XSEG_MAX_TARGETLEN;
2370
2371         m2->objects = xhash_new(3, INTEGER);
2372         ret = read_map(peer, m2, data);
2373 //      print_map(m2);
2374
2375         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2376         ssize_t r, sum = 0;
2377         while (sum < block_size) {
2378                 r = write(fd, data + sum, block_size -sum);
2379                 if (r < 0){
2380                         perror("write");
2381                         printf("write error\n");
2382                         exit(1);
2383                 } 
2384                 sum += r;
2385         }
2386         close(fd);
2387         map_node = find_object(m, 0);
2388         free(map_node);
2389         free(m);
2390 }
2391 */