Merge branch 'xseg-refactor' into debian
[archipelago] / xseg / peers / user / mt-mapperd.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <pthread.h>
5 #include <xseg/xseg.h>
6 #include <peer.h>
7 #include <time.h>
8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <errno.h>
14 #include <sched.h>
15 #include <sys/syscall.h>
16 #include <openssl/sha.h>
17
18 /* general mapper flags */
19 #define MF_LOAD         (1 << 0)
20 #define MF_EXCLUSIVE    (1 << 1)
21 #define MF_FORCE        (1 << 2)
22 #define MF_ARCHIP       (1 << 3)
23
24 #ifndef SHA256_DIGEST_SIZE
25 #define SHA256_DIGEST_SIZE 32
26 #endif
27 /* hex representation of sha256 value takes up double the sha256 size */
28 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
29
30 #define block_size (1<<22) //FIXME this should be defined here?
31
32 /* transparency byte + max object len in disk */
33 #define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
34
35 /* Map header contains:
36  *      map version
37  *      volume size
38  */
39 #define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
40
41
42 #define MAPPER_PREFIX "archip_"
43 #define MAPPER_PREFIX_LEN 7
44
45 #define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
46 #define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
47
48 #if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
49 #error  "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
50 #endif
51
52 #define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
53
54 #if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
55 #error  "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
56 #endif
57
58 #define MAX_VOLUME_SIZE \
59 ((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
60
61
62 char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
63 #define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
64
65 /* dispatch_internal mapper states */
66 enum mapper_state {
67         ACCEPTED = 0,
68         WRITING = 1,
69         COPYING = 2,
70         DELETING = 3,
71         DROPPING_CACHE = 4
72 };
73
74 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
75
76
77 /* mapper object flags */
78 #define MF_OBJECT_EXIST         (1 << 0)
79 #define MF_OBJECT_COPYING       (1 << 1)
80 #define MF_OBJECT_WRITING       (1 << 2)
81 #define MF_OBJECT_DELETING      (1 << 3)
82 #define MF_OBJECT_DESTROYED     (1 << 5)
83
84 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
85                                         MF_OBJECT_DELETING)
86 struct map_node {
87         uint32_t flags;
88         uint32_t objectidx;
89         uint32_t objectlen;
90         char object[MAX_OBJECT_LEN + 1];        /* NULL terminated string */
91         struct map *map;
92         uint32_t ref;
93         uint32_t waiters;
94         st_cond_t cond;
95 };
96
97
98 #define wait_on_pr(__pr, __condition__)         \
99         while (__condition__){                  \
100                 ta--;                           \
101                 __get_mapper_io(pr)->active = 0;\
102                 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
103                 st_cond_wait(__pr->cond);       \
104         }
105
106 #define wait_on_mapnode(__mn, __condition__)    \
107         while (__condition__){                  \
108                 ta--;                           \
109                 __mn->waiters++;                \
110                 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
111                         ta: %u",  __mn, __mn->object, __mn->waiters, ta);  \
112                 st_cond_wait(__mn->cond);       \
113         }
114
115 #define wait_on_map(__map, __condition__)       \
116         while (__condition__){                  \
117                 ta--;                           \
118                 __map->waiters++;               \
119                 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
120                                    __map, __map->volume, __map->waiters, ta); \
121                 st_cond_wait(__map->cond);      \
122         }
123
124 #define signal_pr(__pr)                         \
125         do {                                    \
126                 if (!__get_mapper_io(pr)->active){\
127                         ta++;                   \
128                         XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta);\
129                         __get_mapper_io(pr)->active = 1;\
130                         st_cond_signal(__pr->cond);     \
131                 }                               \
132         }while(0)
133
134 #define signal_map(__map)                       \
135         do {                                    \
136                 if (__map->waiters) {           \
137                         ta += 1;                \
138                         XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
139                         ta: %u",  __map, __map->volume, __map->waiters, ta); \
140                         __map->waiters--;       \
141                         st_cond_signal(__map->cond);    \
142                 }                               \
143         }while(0)
144
145 #define signal_mapnode(__mn)                    \
146         do {                                    \
147                 if (__mn->waiters) {            \
148                         ta += __mn->waiters;    \
149                         XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
150                         %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
151                         __mn->waiters = 0;      \
152                         st_cond_broadcast(__mn->cond);  \
153                 }                               \
154         }while(0)
155
156
157 /* map flags */
158 #define MF_MAP_LOADING          (1 << 0)
159 #define MF_MAP_DESTROYED        (1 << 1)
160 #define MF_MAP_WRITING          (1 << 2)
161 #define MF_MAP_DELETING         (1 << 3)
162 #define MF_MAP_DROPPING_CACHE   (1 << 4)
163 #define MF_MAP_EXCLUSIVE        (1 << 5)
164 #define MF_MAP_OPENING          (1 << 6)
165 #define MF_MAP_CLOSING          (1 << 7)
166 #define MF_MAP_DELETED          (1 << 8)
167
168 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
169                                         MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
170
171 struct map {
172         uint32_t version;
173         uint32_t flags;
174         uint64_t size;
175         uint32_t volumelen;
176         char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
177         xhash_t *objects;       /* obj_index --> map_node */
178         uint32_t ref;
179         uint32_t waiters;
180         st_cond_t cond;
181 };
182
183 struct mapperd {
184         xport bportno;          /* blocker that accesses data */
185         xport mbportno;         /* blocker that accesses maps */
186         xhash_t *hashmaps; // hash_function(target) --> struct map
187 };
188
189 struct mapper_io {
190         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
191         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
192         struct map_node *copyup_node;
193         volatile int err;                       /* error flag */
194         volatile uint64_t del_pending;
195         uint64_t delobj;
196         uint64_t dcobj;
197         cb_t cb;
198         enum mapper_state state;
199         volatile int active;
200 };
201
202 /* global vars */
203 struct mapperd *mapper;
204
205 void print_map(struct map *m);
206
207
208 void custom_peer_usage()
209 {
210         fprintf(stderr, "Custom peer options: \n"
211                         "-bp  : port for block blocker(!)\n"
212                         "-mbp : port for map blocker\n"
213                         "\n");
214 }
215
216
217 /*
218  * Helper functions
219  */
220
221 static inline struct mapperd * __get_mapperd(struct peerd *peer)
222 {
223         return (struct mapperd *) peer->priv;
224 }
225
226 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
227 {
228         return (struct mapper_io *) pr->priv;
229 }
230
231 static inline uint64_t calc_map_obj(struct map *map)
232 {
233         if (map->size == -1)
234                 return 0;
235         uint64_t nr_objs = map->size / block_size;
236         if (map->size % block_size)
237                 nr_objs++;
238         return nr_objs;
239 }
240
241 static uint32_t calc_nr_obj(struct xseg_request *req)
242 {
243         unsigned int r = 1;
244         uint64_t rem_size = req->size;
245         uint64_t obj_offset = req->offset & (block_size -1); //modulo
246         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
247         rem_size -= obj_size;
248         while (rem_size > 0) {
249                 obj_size = (rem_size > block_size) ? block_size : rem_size;
250                 rem_size -= obj_size;
251                 r++;
252         }
253
254         return r;
255 }
256
257 /* hexlify function. 
258  * Unsafe. Doesn't check if data length is odd!
259  */
260 static void hexlify(unsigned char *data, char *hex)
261 {
262         int i;
263         for (i=0; i<SHA256_DIGEST_LENGTH; i++)
264                 sprintf(hex+2*i, "%02x", data[i]);
265 }
266
267 static void unhexlify(char *hex, unsigned char *data)
268 {
269         int i;
270         char c;
271         for (i=0; i<SHA256_DIGEST_LENGTH; i++){
272                 data[i] = 0;
273                 c = hex[2*i];
274                 if (isxdigit(c)){
275                         if (isdigit(c)){
276                                 c-= '0';
277                         }
278                         else {
279                                 c = tolower(c);
280                                 c = c-'a' + 10;
281                         }
282                 }
283                 else {
284                         c = 0;
285                 }
286                 data[i] |= (c << 4) & 0xF0;
287                 c = hex[2*i+1];
288                 if (isxdigit(c)){
289                         if (isdigit(c)){
290                                 c-= '0';
291                         }
292                         else {
293                                 c = tolower(c);
294                                 c = c-'a' + 10;
295                         }
296                 }
297                 else {
298                         c = 0;
299                 }
300                 data[i] |= c & 0x0F;
301         }
302 }
303 /*
304  * Maps handling functions
305  */
306
307 static struct map * find_map(struct mapperd *mapper, char *volume)
308 {
309         struct map *m = NULL;
310         int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
311                                 (xhashidx *) &m);
312         if (r < 0)
313                 return NULL;
314         return m;
315 }
316
317 static struct map * find_map_len(struct mapperd *mapper, char *target,
318                                         uint32_t targetlen, uint32_t flags)
319 {
320         char buf[XSEG_MAX_TARGETLEN+1];
321         if (flags & MF_ARCHIP){
322                 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
323                 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
324                 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
325                 targetlen += MAPPER_PREFIX_LEN;
326         }
327         else {
328                 strncpy(buf, target, targetlen);
329                 buf[targetlen] = 0;
330         }
331
332         if (targetlen > MAX_VOLUME_LEN){
333                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
334                                         targetlen, MAX_VOLUME_LEN);
335                 return NULL;
336         }
337
338         XSEGLOG2(&lc, D, "looking up map %s, len %u",
339                         buf, targetlen);
340         return find_map(mapper, buf);
341 }
342
343
344 static int insert_map(struct mapperd *mapper, struct map *map)
345 {
346         int r = -1;
347
348         if (find_map(mapper, map->volume)){
349                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
350                 goto out;
351         }
352
353         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
354                         map->volume, strlen(map->volume), (unsigned long) map);
355         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
356         while (r == -XHASH_ERESIZE) {
357                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
358                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
359                 if (!new_hashmap){
360                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
361                                         (unsigned long long) shift);
362                         goto out;
363                 }
364                 mapper->hashmaps = new_hashmap;
365                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
366         }
367 out:
368         return r;
369 }
370
371 static int remove_map(struct mapperd *mapper, struct map *map)
372 {
373         int r = -1;
374
375         //assert no pending pr on map
376
377         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
378         while (r == -XHASH_ERESIZE) {
379                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
380                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
381                 if (!new_hashmap){
382                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
383                                         (unsigned long long) shift);
384                         goto out;
385                 }
386                 mapper->hashmaps = new_hashmap;
387                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
388         }
389 out:
390         return r;
391 }
392
393 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
394 {
395         int r;
396         xport p;
397         struct peerd *peer = pr->peer;
398         struct xseg_request *req;
399         struct mapperd *mapper = __get_mapperd(peer);
400         void *dummy;
401
402         XSEGLOG2(&lc, I, "Closing map %s", map->volume);
403
404         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
405         if (!req){
406                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
407                                 map->volume);
408                 goto out_err;
409         }
410
411         r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
412         if (r < 0){
413                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
414                                 map->volume);
415                 goto out_put;
416         }
417
418         char *reqtarget = xseg_get_target(peer->xseg, req);
419         if (!reqtarget)
420                 goto out_put;
421         strncpy(reqtarget, map->volume, req->targetlen);
422         req->op = X_CLOSE;
423         req->size = 0;
424         req->offset = 0;
425         r = xseg_set_req_data(peer->xseg, req, pr);
426         if (r < 0){
427                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
428                                 map->volume);
429                 goto out_put;
430         }
431         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
432         if (p == NoPort){
433                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
434                                 map->volume);
435                 goto out_unset;
436         }
437         r = xseg_signal(peer->xseg, p);
438
439         XSEGLOG2(&lc, I, "Map %s closing", map->volume);
440         return req;
441
442 out_unset:
443         xseg_get_req_data(peer->xseg, req, &dummy);
444 out_put:
445         xseg_put_request(peer->xseg, req, pr->portno);
446 out_err:
447         return NULL;
448 }
449
450 static int close_map(struct peer_req *pr, struct map *map)
451 {
452         int err;
453         struct xseg_request *req;
454         struct peerd *peer = pr->peer;
455
456         map->flags |= MF_MAP_CLOSING;
457         req = __close_map(pr, map);
458         if (!req)
459                 return -1;
460         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
461         map->flags &= ~MF_MAP_CLOSING;
462         err = req->state & XS_FAILED;
463         xseg_put_request(peer->xseg, req, pr->portno);
464         if (err)
465                 return -1;
466         return 0;
467 }
468
469 /*
470 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
471                                 char *target, uint32_t targetlen, struct map **m)
472 {
473         struct mapperd *mapper = __get_mapperd(peer);
474         int r;
475         *m = find_map(mapper, target, targetlen);
476         if (*m) {
477                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
478                 if ((*m)->flags & MF_MAP_NOT_READY) {
479                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
480                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
481                         return MF_PENDING;
482                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
483                 //      return -1;
484                 // 
485                 }else {
486                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
487                         return 0;
488                 }
489         }
490         r = open_map(peer, pr, target, targetlen, 0);
491         if (r < 0)
492                 return -1; //error
493         return MF_PENDING;      
494 }
495 */
496 /*
497  * Object handling functions
498  */
499
500 struct map_node *find_object(struct map *map, uint64_t obj_index)
501 {
502         struct map_node *mn;
503         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
504         if (r < 0)
505                 return NULL;
506         return mn;
507 }
508
509 static int insert_object(struct map *map, struct map_node *mn)
510 {
511         //FIXME no find object first
512         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
513         if (r == -XHASH_ERESIZE) {
514                 unsigned long shift = xhash_grow_size_shift(map->objects);
515                 map->objects = xhash_resize(map->objects, shift, NULL);
516                 if (!map->objects)
517                         return -1;
518                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
519         }
520         return r;
521 }
522
523
524 /*
525  * map read/write functions 
526  */
527 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
528 {
529         hexlify(buf, mn->object);
530         mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
531         mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
532         mn->flags = MF_OBJECT_EXIST;
533 }
534
535 static inline void map_to_object(struct map_node *mn, unsigned char *buf)
536 {
537         char c = buf[0];
538         mn->flags = 0;
539         if (c){
540                 mn->flags |= MF_OBJECT_EXIST;
541                 strcpy(mn->object, MAPPER_PREFIX);
542                 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
543                 mn->object[MAX_OBJECT_LEN] = 0;
544                 mn->objectlen = strlen(mn->object);
545         }
546         else {
547                 hexlify(buf+1, mn->object);
548                 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
549                 mn->objectlen = strlen(mn->object);
550         }
551
552 }
553
554 static inline void object_to_map(char* buf, struct map_node *mn)
555 {
556         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
557         if (buf[0]){
558                 /* strip common prefix */
559                 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
560         }
561         else {
562                 unhexlify(mn->object, (unsigned char *)(buf+1));
563         }
564 }
565
566 static inline void mapheader_to_map(struct map *m, char *buf)
567 {
568         uint64_t pos = 0;
569         memcpy(buf + pos, &m->version, sizeof(m->version));
570         pos += sizeof(m->version);
571         memcpy(buf + pos, &m->size, sizeof(m->size));
572         pos += sizeof(m->size);
573 }
574
575
576 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
577                                 struct map *map, struct map_node *mn)
578 {
579         void *dummy;
580         struct mapperd *mapper = __get_mapperd(peer);
581         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
582                                                         mapper->mbportno, X_ALLOC);
583         if (!req){
584                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
585                                 "(Map: %s [%llu]",
586                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
587                 goto out_err;
588         }
589         int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
590         if (r < 0){
591                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
592                                 "(Map: %s [%llu]",
593                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
594                 goto out_put;
595         }
596         char *target = xseg_get_target(peer->xseg, req);
597         strncpy(target, map->volume, req->targetlen);
598         req->size = objectsize_in_map;
599         req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
600         req->op = X_WRITE;
601         char *data = xseg_get_data(peer->xseg, req);
602         object_to_map(data, mn);
603
604         r = xseg_set_req_data(peer->xseg, req, pr);
605         if (r < 0){
606                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
607                                 "(Map: %s [%llu]",
608                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
609                 goto out_put;
610         }
611         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
612         if (p == NoPort){
613                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
614                                 "(Map: %s [%llu]",
615                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
616                 goto out_unset;
617         }
618         r = xseg_signal(peer->xseg, p);
619         if (r < 0)
620                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
621
622         XSEGLOG2(&lc, I, "Writing object %s \n\t"
623                         "Map: %s [%llu]",
624                         mn->object, map->volume, (unsigned long long) mn->objectidx);
625
626         return req;
627
628 out_unset:
629         xseg_get_req_data(peer->xseg, req, &dummy);
630 out_put:
631         xseg_put_request(peer->xseg, req, pr->portno);
632 out_err:
633         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
634                         "(Map: %s [%llu]",
635                         mn->object, map->volume, (unsigned long long) mn->objectidx);
636         return NULL;
637 }
638
639 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
640 {
641         void *dummy;
642         struct peerd *peer = pr->peer;
643         struct mapperd *mapper = __get_mapperd(peer);
644         struct map_node *mn;
645         uint64_t i, pos, max_objidx = calc_map_obj(map);
646         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
647                                                         mapper->mbportno, X_ALLOC);
648         if (!req){
649                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
650                 goto out_err;
651         }
652         int r = xseg_prep_request(peer->xseg, req, map->volumelen,
653                         mapheader_size + max_objidx * objectsize_in_map);
654         if (r < 0){
655                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
656                 goto out_put;
657         }
658         char *target = xseg_get_target(peer->xseg, req);
659         strncpy(target, map->volume, req->targetlen);
660         char *data = xseg_get_data(peer->xseg, req);
661         mapheader_to_map(map, data);
662         pos = mapheader_size;
663         req->op = X_WRITE;
664         req->size = req->datalen;
665         req->offset = 0;
666
667         if (map->size % block_size)
668                 max_objidx++;
669         for (i = 0; i < max_objidx; i++) {
670                 mn = find_object(map, i);
671                 if (!mn){
672                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
673                                         (unsigned long long) i, map->volume);
674                         goto out_put;
675                 }
676                 object_to_map(data+pos, mn);
677                 pos += objectsize_in_map;
678         }
679         r = xseg_set_req_data(peer->xseg, req, pr);
680         if (r < 0){
681                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
682                                 map->volume);
683                 goto out_put;
684         }
685         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
686         if (p == NoPort){
687                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
688                                 map->volume);
689                 goto out_unset;
690         }
691         r = xseg_signal(peer->xseg, p);
692         if (r < 0)
693                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
694
695         map->flags |= MF_MAP_WRITING;
696         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
697         return req;
698
699 out_unset:
700         xseg_get_req_data(peer->xseg, req, &dummy);
701 out_put:
702         xseg_put_request(peer->xseg, req, pr->portno);
703 out_err:
704         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
705         return NULL;
706 }
707
708 static int write_map(struct peer_req* pr, struct map *map)
709 {
710         int r = 0;
711         struct peerd *peer = pr->peer;
712         map->flags |= MF_MAP_WRITING;
713         struct xseg_request *req = __write_map(pr, map);
714         if (!req)
715                 return -1;
716         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
717         if (req->state & XS_FAILED)
718                 r = -1;
719         xseg_put_request(peer->xseg, req, pr->portno);
720         map->flags &= ~MF_MAP_WRITING;
721         return r;
722 }
723
724 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
725 {
726         int r;
727         xport p;
728         struct xseg_request *req;
729         struct peerd *peer = pr->peer;
730         struct mapperd *mapper = __get_mapperd(peer);
731         void *dummy;
732
733         XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
734
735         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
736         if (!req){
737                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
738                                 m->volume);
739                 goto out_fail;
740         }
741
742         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
743         if (r < 0){
744                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
745                                 m->volume);
746                 goto out_put;
747         }
748
749         char *reqtarget = xseg_get_target(peer->xseg, req);
750         if (!reqtarget)
751                 goto out_put;
752         strncpy(reqtarget, m->volume, req->targetlen);
753         req->op = X_READ;
754         req->size = block_size;
755         req->offset = 0;
756         r = xseg_set_req_data(peer->xseg, req, pr);
757         if (r < 0){
758                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
759                                 m->volume);
760                 goto out_put;
761         }
762         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
763         if (p == NoPort){ 
764                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
765                                 m->volume);
766                 goto out_unset;
767         }
768         r = xseg_signal(peer->xseg, p);
769         
770         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
771         return req;
772
773 out_unset:
774         xseg_get_req_data(peer->xseg, req, &dummy);
775 out_put:
776         xseg_put_request(peer->xseg, req, pr->portno);
777 out_fail:
778         return NULL;
779 }
780
781 static int read_map (struct map *map, unsigned char *buf)
782 {
783         char nulls[SHA256_DIGEST_SIZE];
784         memset(nulls, 0, SHA256_DIGEST_SIZE);
785
786         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
787         if (r) {
788                 XSEGLOG2(&lc, D, "Read zeros");
789                 //read error;
790                 return -1;
791         }
792         //type 1, archip type, type 0 pithos map
793         int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
794         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
795         uint64_t pos;
796         uint64_t i, nr_objs;
797         struct map_node *map_node;
798         if (type) {
799                 uint64_t pos = 0;
800                 map->version = *(uint32_t *) (buf + pos);
801                 pos += sizeof(uint32_t);
802                 map->size = *(uint64_t *) (buf + pos);
803                 pos += sizeof(uint64_t);
804                 nr_objs = map->size / block_size;
805                 if (map->size % block_size)
806                         nr_objs++;
807                 map_node = calloc(nr_objs, sizeof(struct map_node));
808                 if (!map_node)
809                         return -1;
810
811                 for (i = 0; i < nr_objs; i++) {
812                         map_node[i].map = map;
813                         map_node[i].objectidx = i;
814                         map_node[i].waiters = 0;
815                         map_node[i].ref = 1;
816                         map_node[i].cond = st_cond_new(); //FIXME err check;
817                         map_to_object(&map_node[i], buf + pos);
818                         pos += objectsize_in_map;
819                         r = insert_object(map, &map_node[i]); //FIXME error check
820                 }
821         } else {
822                 pos = 0;
823                 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
824                 map_node = calloc(max_nr_objs, sizeof(struct map_node));
825                 if (!map_node)
826                         return -1;
827                 for (i = 0; i < max_nr_objs; i++) {
828                         if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
829                                 break;
830                         map_node[i].objectidx = i;
831                         map_node[i].map = map;
832                         map_node[i].waiters = 0;
833                         map_node[i].ref = 1;
834                         map_node[i].cond = st_cond_new(); //FIXME err check;
835                         pithosmap_to_object(&map_node[i], buf + pos);
836                         pos += SHA256_DIGEST_SIZE;
837                         r = insert_object(map, &map_node[i]); //FIXME error check
838                 }
839                 map->size = i * block_size;
840         }
841         print_map(map);
842         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
843         return 0;
844
845         //FIXME cleanup on error
846 }
847
848 static int load_map(struct peer_req *pr, struct map *map)
849 {
850         int r = 0;
851         struct xseg_request *req;
852         struct peerd *peer = pr->peer;
853         map->flags |= MF_MAP_LOADING;
854         req = __load_map(pr, map);
855         if (!req)
856                 return -1;
857         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
858         map->flags &= ~MF_MAP_LOADING;
859         if (req->state & XS_FAILED){
860                 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
861                 xseg_put_request(peer->xseg, req, pr->portno);
862                 return -1;
863         }
864         r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
865         xseg_put_request(peer->xseg, req, pr->portno);
866         return r;
867 }
868
869 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
870                                                 uint32_t flags)
871 {
872         int r;
873         xport p;
874         struct xseg_request *req;
875         struct peerd *peer = pr->peer;
876         struct mapperd *mapper = __get_mapperd(peer);
877         void *dummy;
878
879         XSEGLOG2(&lc, I, "Opening map %s", m->volume);
880
881         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
882         if (!req){
883                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
884                                 m->volume);
885                 goto out_fail;
886         }
887
888         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
889         if (r < 0){
890                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
891                                 m->volume);
892                 goto out_put;
893         }
894
895         char *reqtarget = xseg_get_target(peer->xseg, req);
896         if (!reqtarget)
897                 goto out_put;
898         strncpy(reqtarget, m->volume, req->targetlen);
899         req->op = X_OPEN;
900         req->size = block_size;
901         req->offset = 0;
902         if (!(flags & MF_FORCE))
903                 req->flags = XF_NOSYNC;
904         r = xseg_set_req_data(peer->xseg, req, pr);
905         if (r < 0){
906                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
907                                 m->volume);
908                 goto out_put;
909         }
910         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
911         if (p == NoPort){ 
912                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
913                                 m->volume);
914                 goto out_unset;
915         }
916         r = xseg_signal(peer->xseg, p);
917         
918         XSEGLOG2(&lc, I, "Map %s opening", m->volume);
919         return req;
920
921 out_unset:
922         xseg_get_req_data(peer->xseg, req, &dummy);
923 out_put:
924         xseg_put_request(peer->xseg, req, pr->portno);
925 out_fail:
926         return NULL;
927 }
928
929 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
930 {
931         int err;
932         struct xseg_request *req;
933         struct peerd *peer = pr->peer;
934
935         map->flags |= MF_MAP_OPENING;
936         req = __open_map(pr, map, flags);
937         if (!req){
938                 map->flags &= ~MF_MAP_OPENING;
939                 return -1;
940         }
941         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
942         map->flags &= ~MF_MAP_OPENING;
943         err = req->state & XS_FAILED;
944         xseg_put_request(peer->xseg, req, pr->portno);
945         if (err)
946                 return -1;
947         else 
948                 map->flags |= MF_MAP_EXCLUSIVE;
949         return 0;
950 }
951
952 /*
953  * copy up functions
954  */
955
956 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
957 {
958         int r = 0;
959         if (mn){
960                 XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
961                                 req, mn, mio);
962                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
963                 if (r == -XHASH_ERESIZE) {
964                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
965                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
966                         if (!new_hashmap)
967                                 goto out;
968                         mio->copyups_nodes = new_hashmap;
969                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
970                 }
971                 if (r < 0)
972                         XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
973                                         req, mn, mio);
974         }
975         else {
976                 XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
977                                 req, mio);
978                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
979                 if (r == -XHASH_ERESIZE) {
980                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
981                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
982                         if (!new_hashmap)
983                                 goto out;
984                         mio->copyups_nodes = new_hashmap;
985                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
986                 }
987                 if (r < 0)
988                         XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
989                                         req, mio);
990         }
991 out:
992         return r;
993 }
994
995 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
996 {
997         struct map_node *mn;
998         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
999         if (r < 0){
1000                 XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
1001                 return NULL;
1002         }
1003         XSEGLOG2(&lc, I, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
1004         return mn;
1005 }
1006
1007 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
1008 {
1009         struct mapperd *mapper = __get_mapperd(peer);
1010         struct mapper_io *mio = __get_mapper_io(pr);
1011         struct map *map = mn->map;
1012         void *dummy;
1013         int r = -1;
1014         xport p;
1015
1016         uint32_t newtargetlen;
1017         char new_target[MAX_OBJECT_LEN + 1];
1018         unsigned char sha[SHA256_DIGEST_SIZE];
1019
1020         strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1021
1022         char tmp[XSEG_MAX_TARGETLEN + 1];
1023         uint32_t tmplen;
1024         strncpy(tmp, map->volume, map->volumelen);
1025         sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1026         tmp[XSEG_MAX_TARGETLEN] = 0;
1027         tmplen = strlen(tmp);
1028         SHA256((unsigned char *)tmp, tmplen, sha);
1029         hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1030         newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1031
1032
1033         if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1034                 goto copyup_zeroblock;
1035
1036         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1037                                                 mapper->bportno, X_ALLOC);
1038         if (!req){
1039                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1040                 goto out_err;
1041         }
1042         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
1043                                 sizeof(struct xseg_request_copy));
1044         if (r < 0){
1045                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1046                 goto out_put;
1047         }
1048
1049         char *target = xseg_get_target(peer->xseg, req);
1050         strncpy(target, new_target, req->targetlen);
1051
1052         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1053         strncpy(xcopy->target, mn->object, mn->objectlen);
1054         xcopy->targetlen = mn->objectlen;
1055
1056         req->offset = 0;
1057         req->size = block_size;
1058         req->op = X_COPY;
1059         r = xseg_set_req_data(peer->xseg, req, pr);
1060         if (r<0){
1061                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1062                 goto out_put;
1063         }
1064         r = __set_copyup_node(mio, req, mn);
1065         if (r < 0)
1066                 goto out_unset;
1067         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1068         if (p == NoPort) {
1069                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1070                 goto out_mapper_unset;
1071         }
1072         xseg_signal(peer->xseg, p);
1073 //      mio->copyups++;
1074
1075         mn->flags |= MF_OBJECT_COPYING;
1076         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1077         return req;
1078
1079 out_mapper_unset:
1080         __set_copyup_node(mio, req, NULL);
1081 out_unset:
1082         xseg_get_req_data(peer->xseg, req, &dummy);
1083 out_put:
1084         xseg_put_request(peer->xseg, req, pr->portno);
1085 out_err:
1086         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1087         return NULL;
1088
1089 copyup_zeroblock:
1090         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1091                         "Proceeding in writing the new object in map");
1092         /* construct a tmp map_node for writing purposes */
1093         struct map_node newmn = *mn;
1094         newmn.flags = MF_OBJECT_EXIST;
1095         strncpy(newmn.object, new_target, newtargetlen);
1096         newmn.object[newtargetlen] = 0;
1097         newmn.objectlen = newtargetlen;
1098         newmn.objectidx = mn->objectidx; 
1099         req = object_write(peer, pr, map, &newmn);
1100         r = __set_copyup_node(mio, req, mn);
1101         if (r < 0)
1102                 return NULL;
1103         if (!req){
1104                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1105                                 "\n\t of map %s [%llu]",
1106                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
1107                 return NULL;
1108         }
1109         mn->flags |= MF_OBJECT_WRITING;
1110         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1111         return req;
1112 }
1113
1114 static struct xseg_request * delete_object(struct peer_req *pr, struct map_node *mn)
1115 {
1116         void *dummy;
1117         struct peerd *peer = pr->peer;
1118         struct mapperd *mapper = __get_mapperd(peer);
1119         struct mapper_io *mio = __get_mapper_io(pr);
1120         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1121                                                         mapper->bportno, X_ALLOC);
1122         XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1123         if (!req){
1124                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1125                 goto out_err;
1126         }
1127         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1128         if (r < 0){
1129                 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1130                 goto out_put;
1131         }
1132         char *target = xseg_get_target(peer->xseg, req);
1133         strncpy(target, mn->object, req->targetlen);
1134         req->op = X_DELETE;
1135         req->size = req->datalen;
1136         req->offset = 0;
1137         r = xseg_set_req_data(peer->xseg, req, pr);
1138         if (r < 0){
1139                 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1140                 goto out_put;
1141         }
1142         r = __set_copyup_node(mio, req, mn);
1143         if (r < 0)
1144                 goto out_unset;
1145         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1146         if (p == NoPort){
1147                 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1148                 goto out_mapper_unset;
1149         }
1150         r = xseg_signal(peer->xseg, p);
1151         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1152         return req;
1153
1154 out_mapper_unset:
1155         __set_copyup_node(mio, req, NULL);
1156 out_unset:
1157         xseg_get_req_data(peer->xseg, req, &dummy);
1158 out_put:
1159         xseg_put_request(peer->xseg, req, pr->portno);
1160 out_err:
1161         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1162         return NULL;
1163 }
1164
1165 static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
1166 {
1167         void *dummy;
1168         struct peerd *peer = pr->peer;
1169         struct mapperd *mapper = __get_mapperd(peer);
1170         struct mapper_io *mio = __get_mapper_io(pr);
1171         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1172                                                         mapper->mbportno, X_ALLOC);
1173         XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1174         if (!req){
1175                 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1176                 goto out_err;
1177         }
1178         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1179         if (r < 0){
1180                 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1181                 goto out_put;
1182         }
1183         char *target = xseg_get_target(peer->xseg, req);
1184         strncpy(target, map->volume, req->targetlen);
1185         req->op = X_DELETE;
1186         req->size = req->datalen;
1187         req->offset = 0;
1188         r = xseg_set_req_data(peer->xseg, req, pr);
1189         if (r < 0){
1190                 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1191                 goto out_put;
1192         }
1193         /* do not check return value. just make sure there is no node set */
1194         __set_copyup_node(mio, req, NULL);
1195         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1196         if (p == NoPort){
1197                 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1198                 goto out_unset;
1199         }
1200         r = xseg_signal(peer->xseg, p);
1201         map->flags |= MF_MAP_DELETING;
1202         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1203         return req;
1204
1205 out_unset:
1206         xseg_get_req_data(peer->xseg, req, &dummy);
1207 out_put:
1208         xseg_put_request(peer->xseg, req, pr->portno);
1209 out_err:
1210         XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1211         return  NULL;
1212 }
1213
1214
1215 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1216 {
1217         struct map_node *mn = find_object(map, index);
1218         if (mn)
1219                 mn->ref++;
1220         return mn;
1221 }
1222
1223 static inline void put_mapnode(struct map_node *mn)
1224 {
1225         mn->ref--;
1226         if (!mn->ref){
1227                 //clean up mn
1228                 st_cond_destroy(mn->cond);
1229         }
1230 }
1231
1232 static inline void __get_map(struct map *map)
1233 {
1234         map->ref++;
1235 }
1236
1237 static inline void put_map(struct map *map)
1238 {
1239         struct map_node *mn;
1240         map->ref--;
1241         if (!map->ref){
1242                 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1243                 //clean up map
1244                 uint64_t i;
1245                 for (i = 0; i < calc_map_obj(map); i++) {
1246                         mn = get_mapnode(map, i);
1247                         if (mn) {
1248                                 //make sure all pending operations on all objects are completed
1249                                 if (mn->flags & MF_OBJECT_NOT_READY){
1250                                         //this should never happen...
1251                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1252                                 }
1253                                 mn->flags &= MF_OBJECT_DESTROYED;
1254                                 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1255                                 put_mapnode(mn); //matcing get_mapnode;
1256                                 //assert mn->ref == 0;
1257                         }
1258                 }
1259                 mn = find_object(map, 0);
1260                 if (mn)
1261                         free(mn);
1262                 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1263                 free(map);
1264         }
1265 }
1266
1267 static struct map * create_map(struct mapperd *mapper, char *name,
1268                                 uint32_t namelen, uint32_t flags)
1269 {
1270         int r;
1271         if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1272                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1273                                         namelen, MAX_VOLUME_LEN);
1274                 return NULL;
1275         }
1276         struct map *m = malloc(sizeof(struct map));
1277         if (!m){
1278                 XSEGLOG2(&lc, E, "Cannot allocate map ");
1279                 goto out_err;
1280         }
1281         m->size = -1;
1282         if (flags & MF_ARCHIP){
1283                 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1284                 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1285                 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1286                 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1287                 m->version = 1; /* keep this hardcoded for now */
1288         }
1289         else {
1290                 strncpy(m->volume, name, namelen);
1291                 m->volume[namelen] = 0;
1292                 m->volumelen = namelen;
1293                 m->version = 0; /* version 0 should be pithos maps */
1294         }
1295         m->flags = 0;
1296         m->objects = xhash_new(3, INTEGER); 
1297         if (!m->objects){
1298                 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1299                                 m->volume);
1300                 goto out_map;
1301         }
1302         m->ref = 1;
1303         m->waiters = 0;
1304         m->cond = st_cond_new(); //FIXME err check;
1305         r = insert_map(mapper, m);
1306         if (r < 0){
1307                 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1308                 goto out_hash;
1309         }
1310
1311         return m;
1312
1313 out_hash:
1314         xhash_free(m->objects);
1315 out_map:
1316         XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1317         free(m);
1318 out_err:
1319         return NULL;
1320 }
1321
1322
1323
1324 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1325 {
1326         struct peerd *peer = pr->peer;
1327         struct mapperd *mapper = __get_mapperd(peer);
1328         (void)mapper;
1329         struct mapper_io *mio = __get_mapper_io(pr);
1330         struct map_node *mn = __get_copyup_node(mio, req);
1331
1332         __set_copyup_node(mio, req, NULL);
1333
1334         XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
1335         mio->del_pending--;
1336         if (req->state & XS_FAILED){
1337                 mio->err = 1;
1338         }
1339         if (mn){
1340                 XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
1341                                 mn, mn->object, mio, req);
1342                 signal_mapnode(mn);
1343         }
1344         else {
1345                 XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
1346                                 mio, req);
1347         }
1348         xseg_put_request(peer->xseg, req, pr->portno);
1349         signal_pr(pr);
1350 }
1351
1352 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1353 {
1354         struct peerd *peer = pr->peer;
1355         struct mapperd *mapper = __get_mapperd(peer);
1356         (void)mapper;
1357         struct mapper_io *mio = __get_mapper_io(pr);
1358         struct map_node *mn = __get_copyup_node(mio, req);
1359         if (!mn){
1360                 XSEGLOG2(&lc, E, "Cannot get map node");
1361                 goto out_err;
1362         }
1363         __set_copyup_node(mio, req, NULL);
1364
1365         if (req->state & XS_FAILED){
1366                 XSEGLOG2(&lc, E, "Req failed");
1367                 mn->flags &= ~MF_OBJECT_COPYING;
1368                 mn->flags &= ~MF_OBJECT_WRITING;
1369                 goto out_err;
1370         }
1371         if (req->op == X_WRITE) {
1372                 char *target = xseg_get_target(peer->xseg, req);
1373                 (void)target;
1374                 //printf("handle object write replyi\n");
1375                 __set_copyup_node(mio, req, NULL);
1376                 //assert mn->flags & MF_OBJECT_WRITING
1377                 mn->flags &= ~MF_OBJECT_WRITING;
1378
1379                 struct map_node tmp;
1380                 char *data = xseg_get_data(peer->xseg, req);
1381                 map_to_object(&tmp, (unsigned char *) data);
1382                 mn->flags |= MF_OBJECT_EXIST;
1383                 if (mn->flags != MF_OBJECT_EXIST){
1384                         XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1385                         goto out_err;
1386                 }
1387                 //assert mn->flags & MF_OBJECT_EXIST
1388                 strncpy(mn->object, tmp.object, tmp.objectlen);
1389                 mn->object[tmp.objectlen] = 0;
1390                 mn->objectlen = tmp.objectlen;
1391                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1392                 mio->copyups--;
1393                 signal_mapnode(mn);
1394         } else if (req->op == X_COPY) {
1395         //      issue write_object;
1396                 mn->flags &= ~MF_OBJECT_COPYING;
1397                 struct map *map = mn->map;
1398                 if (!map){
1399                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1400                         goto out_err;
1401                 }
1402
1403                 /* construct a tmp map_node for writing purposes */
1404                 char *target = xseg_get_target(peer->xseg, req);
1405                 struct map_node newmn = *mn;
1406                 newmn.flags = MF_OBJECT_EXIST;
1407                 strncpy(newmn.object, target, req->targetlen);
1408                 newmn.object[req->targetlen] = 0;
1409                 newmn.objectlen = req->targetlen;
1410                 newmn.objectidx = mn->objectidx; 
1411                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1412                 if (!xreq){
1413                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1414                                         "\n\t of map %s [%llu]",
1415                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1416                         goto out_err;
1417                 }
1418                 mn->flags |= MF_OBJECT_WRITING;
1419                 __set_copyup_node (mio, xreq, mn);
1420
1421                 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1422         } else {
1423                 //wtf??
1424                 ;
1425         }
1426
1427 out:
1428         xseg_put_request(peer->xseg, req, pr->portno);
1429         return;
1430
1431 out_err:
1432         mio->copyups--;
1433         XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1434         mio->err = 1;
1435         if (mn)
1436                 signal_mapnode(mn);
1437         goto out;
1438
1439 }
1440
1441 struct r2o {
1442         struct map_node *mn;
1443         uint64_t offset;
1444         uint64_t size;
1445 };
1446
1447 static int req2objs(struct peer_req *pr, struct map *map, int write)
1448 {
1449         int r = 0;
1450         struct peerd *peer = pr->peer;
1451         struct mapper_io *mio = __get_mapper_io(pr);
1452         char *target = xseg_get_target(peer->xseg, pr->req);
1453         uint32_t nr_objs = calc_nr_obj(pr->req);
1454         uint64_t size = sizeof(struct xseg_reply_map) + 
1455                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1456         uint32_t idx, i, ready;
1457         uint64_t rem_size, obj_index, obj_offset, obj_size; 
1458         struct map_node *mn;
1459         mio->copyups = 0;
1460         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1461
1462         /* get map_nodes of request */
1463         struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1464         if (!mns){
1465                 XSEGLOG2(&lc, E, "Cannot allocate mns");
1466                 return -1;
1467         }
1468         idx = 0;
1469         rem_size = pr->req->size;
1470         obj_index = pr->req->offset / block_size;
1471         obj_offset = pr->req->offset & (block_size -1); //modulo
1472         obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1473         mn = get_mapnode(map, obj_index);
1474         if (!mn) {
1475                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1476                 r = -1;
1477                 goto out;
1478         }
1479         mns[idx].mn = mn;
1480         mns[idx].offset = obj_offset;
1481         mns[idx].size = obj_size;
1482         rem_size -= obj_size;
1483         while (rem_size > 0) {
1484                 idx++;
1485                 obj_index++;
1486                 obj_offset = 0;
1487                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
1488                 rem_size -= obj_size;
1489                 mn = get_mapnode(map, obj_index);
1490                 if (!mn) {
1491                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1492                         r = -1;
1493                         goto out;
1494                 }
1495                 mns[idx].mn = mn;
1496                 mns[idx].offset = obj_offset;
1497                 mns[idx].size = obj_size;
1498         }
1499         if (write) {
1500                 ready = 0;
1501                 int can_wait = 0;
1502                 mio->cb=copyup_cb;
1503                 while (ready < (idx + 1)){
1504                         ready = 0;
1505                         for (i = 0; i < (idx+1); i++) {
1506                                 mn = mns[i].mn;
1507                                 //do copyups
1508                                 if (mn->flags & MF_OBJECT_NOT_READY) {
1509                                         if (can_wait){
1510                                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1511                                                 if (mn->flags & MF_OBJECT_DESTROYED){
1512                                                         mio->err = 1;
1513                                                 }
1514                                                 if (mio->err){
1515                                                         XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1516                                                         if (!mio->copyups){
1517                                                                 r = -1;
1518                                                                 goto out;
1519                                                         }
1520                                                 }
1521                                         }
1522                                 }
1523                                 else if (!(mn->flags & MF_OBJECT_EXIST)) {
1524                                         //calc new_target, copy up object
1525                                         if (copyup_object(peer, mn, pr) == NULL){
1526                                                 XSEGLOG2(&lc, E, "Error in copy up object");
1527                                                 mio->err = 1;
1528                                         } else {
1529                                                 mio->copyups++;
1530                                         }
1531                                 } else {
1532                                         ready++;
1533                                 }
1534                         }
1535                         can_wait = 1;
1536                 }
1537                 /*
1538 pending_copyups:
1539                 while(mio->copyups > 0){
1540                         mio->cb = copyup_cb;
1541                         wait_on_pr(pr, 0);
1542                         ta--;
1543                         st_cond_wait(pr->cond);
1544                 }
1545                 */
1546         }
1547
1548         if (mio->err){
1549                 r = -1;
1550                 XSEGLOG2(&lc, E, "Mio->err");
1551                 goto out;
1552         }
1553
1554         /* resize request to fit reply */
1555         char buf[XSEG_MAX_TARGETLEN];
1556         strncpy(buf, target, pr->req->targetlen);
1557         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1558         if (r < 0) {
1559                 XSEGLOG2(&lc, E, "Cannot resize request");
1560                 goto out;
1561         }
1562         target = xseg_get_target(peer->xseg, pr->req);
1563         strncpy(target, buf, pr->req->targetlen);
1564
1565         /* structure reply */
1566         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1567         reply->cnt = nr_objs;
1568         for (i = 0; i < (idx+1); i++) {
1569                 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1570                 reply->segs[i].targetlen = mns[i].mn->objectlen;
1571                 reply->segs[i].offset = mns[i].offset;
1572                 reply->segs[i].size = mns[i].size;
1573         }
1574 out:
1575         for (i = 0; i < idx; i++) {
1576                 put_mapnode(mns[i].mn);
1577         }
1578         free(mns);
1579         return r;
1580 }
1581
1582 static int do_dropcache(struct peer_req *pr, struct map *map)
1583 {
1584         struct map_node *mn;
1585         struct peerd *peer = pr->peer;
1586         struct mapperd *mapper = __get_mapperd(peer);
1587         uint64_t i;
1588         XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1589         map->flags |= MF_MAP_DROPPING_CACHE;
1590         for (i = 0; i < calc_map_obj(map); i++) {
1591                 mn = get_mapnode(map, i);
1592                 if (mn) {
1593                         if (!(mn->flags & MF_OBJECT_DESTROYED)){
1594                                 //make sure all pending operations on all objects are completed
1595                                 if (mn->flags & MF_OBJECT_NOT_READY){
1596                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1597                                 }
1598                                 mn->flags &= MF_OBJECT_DESTROYED;
1599                         }
1600                         put_mapnode(mn);
1601                 }
1602         }
1603         map->flags &= ~MF_MAP_DROPPING_CACHE;
1604         map->flags |= MF_MAP_DESTROYED;
1605         remove_map(mapper, map);
1606         XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1607         put_map(map);   // put map here to destroy it (matches m->ref = 1 on map create)
1608         return 0;
1609 }
1610
1611 static int do_info(struct peer_req *pr, struct map *map)
1612 {
1613         struct peerd *peer = pr->peer;
1614         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1615         xinfo->size = map->size;
1616         return 0;
1617 }
1618
1619
1620 static int do_close(struct peer_req *pr, struct map *map)
1621 {
1622         if (map->flags & MF_MAP_EXCLUSIVE){
1623                 /* do not drop cache if close failed and map not deleted */
1624                 if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
1625                         return -1;
1626         }
1627         return do_dropcache(pr, map);
1628 }
1629
1630 static int do_destroy(struct peer_req *pr, struct map *map)
1631 {
1632         uint64_t i;
1633         struct peerd *peer = pr->peer;
1634         struct mapper_io *mio = __get_mapper_io(pr);
1635         struct map_node *mn;
1636         struct xseg_request *req;
1637         
1638         XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1639         map->flags |= MF_MAP_DELETING;
1640         req = delete_map(pr, map);
1641         if (!req)
1642                 return -1;
1643         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1644         if (req->state & XS_FAILED){
1645                 xseg_put_request(peer->xseg, req, pr->portno);
1646                 map->flags &= ~MF_MAP_DELETING;
1647                 return -1;
1648         }
1649         xseg_put_request(peer->xseg, req, pr->portno);
1650         //FIXME
1651         uint64_t nr_obj = calc_map_obj(map);
1652         uint64_t deleted = 0;
1653         while (deleted < nr_obj){ 
1654                 deleted = 0;
1655                 for (i = 0; i < nr_obj; i++){
1656                         mn = get_mapnode(map, i);
1657                         if (mn) {
1658                                 if (!(mn->flags & MF_OBJECT_DESTROYED)){
1659                                         if (mn->flags & MF_OBJECT_EXIST){
1660                                                 //make sure all pending operations on all objects are completed
1661                                                 if (mn->flags & MF_OBJECT_NOT_READY){
1662                                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1663                                                 }
1664                                                 req = delete_object(pr, mn);
1665                                                 if (!req)
1666                                                         if (mio->del_pending){
1667                                                                 goto wait_pending;
1668                                                         } else {
1669                                                                 continue;
1670                                                         }
1671                                                 else {
1672                                                         mio->del_pending++;
1673                                                 }
1674                                         }
1675                                         mn->flags &= MF_OBJECT_DESTROYED;
1676                                 }
1677                                 put_mapnode(mn);
1678                         }
1679                         deleted++;
1680                 }
1681 wait_pending:
1682                 mio->cb = deletion_cb;
1683                 wait_on_pr(pr, mio->del_pending > 0);
1684         }
1685         mio->cb = NULL;
1686         map->flags &= ~MF_MAP_DELETING;
1687         map->flags |= MF_MAP_DELETED;
1688         XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1689         return do_close(pr, map);
1690 }
1691
1692 static int do_mapr(struct peer_req *pr, struct map *map)
1693 {
1694         struct peerd *peer = pr->peer;
1695         int r = req2objs(pr, map, 0);
1696         if  (r < 0){
1697                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1698                                 map->volume, 
1699                                 (unsigned long long) pr->req->offset, 
1700                                 (unsigned long long) (pr->req->offset + pr->req->size));
1701                 return -1;
1702         }
1703         XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1704                         map->volume, 
1705                         (unsigned long long) pr->req->offset, 
1706                         (unsigned long long) (pr->req->offset + pr->req->size));
1707         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1708                         (unsigned long long) pr->req->offset,
1709                         (unsigned long long) pr->req->size);
1710         char buf[XSEG_MAX_TARGETLEN+1];
1711         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1712         int i;
1713         for (i = 0; i < reply->cnt; i++) {
1714                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1715                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1716                 buf[reply->segs[i].targetlen] = 0;
1717                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1718                                 (unsigned long long) reply->segs[i].offset,
1719                                 (unsigned long long) reply->segs[i].size);
1720         }
1721         return 0;
1722 }
1723
1724 static int do_mapw(struct peer_req *pr, struct map *map)
1725 {
1726         struct peerd *peer = pr->peer;
1727         int r = req2objs(pr, map, 1);
1728         if  (r < 0){
1729                 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1730                                 map->volume, 
1731                                 (unsigned long long) pr->req->offset, 
1732                                 (unsigned long long) (pr->req->offset + pr->req->size));
1733                 return -1;
1734         }
1735         XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1736                         map->volume, 
1737                         (unsigned long long) pr->req->offset, 
1738                         (unsigned long long) (pr->req->offset + pr->req->size));
1739         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1740                         (unsigned long long) pr->req->offset,
1741                         (unsigned long long) pr->req->size);
1742         char buf[XSEG_MAX_TARGETLEN+1];
1743         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1744         int i;
1745         for (i = 0; i < reply->cnt; i++) {
1746                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1747                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1748                 buf[reply->segs[i].targetlen] = 0;
1749                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1750                                 (unsigned long long) reply->segs[i].offset,
1751                                 (unsigned long long) reply->segs[i].size);
1752         }
1753         return 0;
1754 }
1755
1756 //here map is the parent map
1757 static int do_clone(struct peer_req *pr, struct map *map)
1758 {
1759         /*
1760         FIXME check if clone map exists
1761         clonemap = get_map(pr, target, targetlen, MF_LOAD);
1762         if (clonemap)
1763                 do_dropcache(pr, clonemap); // drop map here, rely on get_map_function to drop
1764                                         //      cache on non-exclusive opens or declare a NO_CACHE flag ?
1765                 return -1;
1766         */
1767
1768         int r;
1769         char buf[XSEG_MAX_TARGETLEN];
1770         struct peerd *peer = pr->peer;
1771         struct mapperd *mapper = __get_mapperd(peer);
1772         char *target = xseg_get_target(peer->xseg, pr->req);
1773         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1774         XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1775         struct map *clonemap = create_map(mapper, target, pr->req->targetlen,
1776                                                 MF_ARCHIP);
1777         if (!clonemap) 
1778                 return -1;
1779
1780         if (xclone->size == -1)
1781                 clonemap->size = map->size;
1782         else
1783                 clonemap->size = xclone->size;
1784         if (clonemap->size < map->size){
1785                 target = xseg_get_target(peer->xseg, pr->req);
1786                 strncpy(buf, target, pr->req->targetlen);
1787                 buf[pr->req->targetlen] = 0;
1788                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1789                                 "\n\t for requested clone %s",
1790                                 (unsigned long long) xclone->size,
1791                                 (unsigned long long) map->size, buf);
1792                 goto out_err;
1793         }
1794
1795         //alloc and init map_nodes
1796         unsigned long c = clonemap->size/block_size + 1;
1797         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1798         if (!map_nodes){
1799                 goto out_err;
1800         }
1801         int i;
1802         for (i = 0; i < clonemap->size/block_size + 1; i++) {
1803                 struct map_node *mn = get_mapnode(map, i);
1804                 if (mn) {
1805                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1806                         map_nodes[i].objectlen = mn->objectlen;
1807                         put_mapnode(mn);
1808                 } else {
1809                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1810                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1811                 }
1812                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1813                 map_nodes[i].flags = 0;
1814                 map_nodes[i].objectidx = i;
1815                 map_nodes[i].map = clonemap;
1816                 map_nodes[i].ref = 1;
1817                 map_nodes[i].waiters = 0;
1818                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1819                 r = insert_object(clonemap, &map_nodes[i]);
1820                 if (r < 0){
1821                         XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1822                         goto out_err;
1823                 }
1824         }
1825         r = write_map(pr, clonemap);
1826         if (r < 0){
1827                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1828                 goto out_err;
1829         }
1830         return 0;
1831
1832 out_err:
1833         put_map(clonemap);
1834         return -1;
1835 }
1836
1837 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1838 {
1839         int r, opened = 0;
1840         if (flags & MF_EXCLUSIVE){
1841                 r = open_map(pr, map, flags);
1842                 if (r < 0) {
1843                         if (flags & MF_FORCE){
1844                                 return -1;
1845                         }
1846                 } else {
1847                         opened = 1;
1848                 }
1849         }
1850         r = load_map(pr, map);
1851         if (r < 0 && opened){
1852                 close_map(pr, map);
1853         }
1854         return r;
1855 }
1856
1857 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
1858                         uint32_t flags)
1859 {
1860         int r;
1861         struct peerd *peer = pr->peer;
1862         struct mapperd *mapper = __get_mapperd(peer);
1863         struct map *map = find_map_len(mapper, name, namelen, flags);
1864         if (!map){
1865                 if (flags & MF_LOAD){
1866                         map = create_map(mapper, name, namelen, flags);
1867                         if (!map)
1868                                 return NULL;
1869                         r = open_load_map(pr, map, flags);
1870                         if (r < 0){
1871                                 do_dropcache(pr, map);
1872                                 return NULL;
1873                         }
1874                 } else {
1875                         return NULL;
1876                 }
1877         } else if (map->flags & MF_MAP_DESTROYED){
1878                 return NULL;
1879         }
1880         __get_map(map);
1881         return map;
1882
1883 }
1884
1885 static int map_action(int (action)(struct peer_req *pr, struct map *map),
1886                 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1887 {
1888         //struct peerd *peer = pr->peer;
1889         struct map *map;
1890 start:
1891         map = get_map(pr, name, namelen, flags);
1892         if (!map)
1893                 return -1;
1894         if (map->flags & MF_MAP_NOT_READY){
1895                 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1896                 put_map(map);
1897                 goto start;
1898         }
1899         int r = action(pr, map);
1900         //always drop cache if map not read exclusively
1901         if (!(map->flags & MF_MAP_EXCLUSIVE))
1902                 do_dropcache(pr, map);
1903         signal_map(map);
1904         put_map(map);
1905         return r;
1906 }
1907
1908 void * handle_info(struct peer_req *pr)
1909 {
1910         struct peerd *peer = pr->peer;
1911         char *target = xseg_get_target(peer->xseg, pr->req);
1912         int r = map_action(do_info, pr, target, pr->req->targetlen,
1913                                 MF_ARCHIP|MF_LOAD);
1914         if (r < 0)
1915                 fail(peer, pr);
1916         else
1917                 complete(peer, pr);
1918         ta--;
1919         return NULL;
1920 }
1921
1922 void * handle_clone(struct peer_req *pr)
1923 {
1924         int r;
1925         struct peerd *peer = pr->peer;
1926         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1927         if (!xclone) {
1928                 r = -1;
1929                 goto out;
1930         }
1931         if (xclone->targetlen){
1932                 //support clone only from pithos
1933                 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
1934                                         MF_LOAD);
1935         } else {
1936                 if (!xclone->size){
1937                         r = -1;
1938                 } else {
1939                         //FIXME
1940                         struct map *map;
1941                         char *target = xseg_get_target(peer->xseg, pr->req);
1942                         XSEGLOG2(&lc, I, "Creating volume");
1943                         map = get_map(pr, target, pr->req->targetlen,
1944                                                 MF_ARCHIP|MF_LOAD);
1945                         if (map){
1946                                 XSEGLOG2(&lc, E, "Volume %s exists", map->volume);
1947                                 if (map->ref <= 2) //initial one + one ref from __get_map
1948                                         do_dropcache(pr, map); //we are the only ones usining this map. Drop the cache. 
1949                                 put_map(map); //matches get_map
1950                                 r = -1;
1951                                 goto out;
1952                         }
1953                         //create a new empty map of size
1954                         map = create_map(mapper, target, pr->req->targetlen,
1955                                                 MF_ARCHIP);
1956                         if (!map){
1957                                 r = -1;
1958                                 goto out;
1959                         }
1960                         map->size = xclone->size;
1961                         //populate_map with zero objects;
1962                         uint64_t nr_objs = xclone->size / block_size;
1963                         if (xclone->size % block_size)
1964                                 nr_objs++;
1965
1966                         struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1967                         if (!map_nodes){
1968                                 do_dropcache(pr, map); //Since we just created the map, dropping cache should be sufficient.
1969                                 r = -1;
1970                                 goto out;
1971                         }
1972                         uint64_t i;
1973                         for (i = 0; i < nr_objs; i++) {
1974                                 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1975                                 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1976                                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1977                                 map_nodes[i].flags = 0;
1978                                 map_nodes[i].objectidx = i;
1979                                 map_nodes[i].map = map;
1980                                 map_nodes[i].ref = 1;
1981                                 map_nodes[i].waiters = 0;
1982                                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1983                                 r = insert_object(map, &map_nodes[i]);
1984                                 if (r < 0){
1985                                         do_dropcache(pr, map);
1986                                         r = -1;
1987                                         goto out;
1988                                 }
1989                         }
1990                         r = write_map(pr, map);
1991                         if (r < 0){
1992                                 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1993                                 do_dropcache(pr, map);
1994                                 goto out;
1995                         }
1996                         XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1997                         r = 0;
1998                         do_dropcache(pr, map); //drop cache here for consistency
1999                 }
2000         }
2001 out:
2002         if (r < 0)
2003                 fail(peer, pr);
2004         else
2005                 complete(peer, pr);
2006         ta--;
2007         return NULL;
2008 }
2009
2010 void * handle_mapr(struct peer_req *pr)
2011 {
2012         struct peerd *peer = pr->peer;
2013         char *target = xseg_get_target(peer->xseg, pr->req);
2014         int r = map_action(do_mapr, pr, target, pr->req->targetlen,
2015                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2016         if (r < 0)
2017                 fail(peer, pr);
2018         else
2019                 complete(peer, pr);
2020         ta--;
2021         return NULL;
2022 }
2023
2024 void * handle_mapw(struct peer_req *pr)
2025 {
2026         struct peerd *peer = pr->peer;
2027         char *target = xseg_get_target(peer->xseg, pr->req);
2028         int r = map_action(do_mapw, pr, target, pr->req->targetlen,
2029                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2030         if (r < 0)
2031                 fail(peer, pr);
2032         else
2033                 complete(peer, pr);
2034         XSEGLOG2(&lc, D, "Ta: %d", ta);
2035         ta--;
2036         return NULL;
2037 }
2038
2039 void * handle_destroy(struct peer_req *pr)
2040 {
2041         struct peerd *peer = pr->peer;
2042         char *target = xseg_get_target(peer->xseg, pr->req);
2043         int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2044                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2045         if (r < 0)
2046                 fail(peer, pr);
2047         else
2048                 complete(peer, pr);
2049         ta--;
2050         return NULL;
2051 }
2052
2053 void * handle_close(struct peer_req *pr)
2054 {
2055         struct peerd *peer = pr->peer;
2056         char *target = xseg_get_target(peer->xseg, pr->req);
2057         //here we do not want to load
2058         int r = map_action(do_close, pr, target, pr->req->targetlen,
2059                                 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2060         if (r < 0)
2061                 fail(peer, pr);
2062         else
2063                 complete(peer, pr);
2064         ta--;
2065         return NULL;
2066 }
2067
2068 int dispatch_accepted(struct peerd *peer, struct peer_req *pr, 
2069                         struct xseg_request *req)
2070 {
2071         //struct mapperd *mapper = __get_mapperd(peer);
2072         struct mapper_io *mio = __get_mapper_io(pr);
2073         void *(*action)(struct peer_req *) = NULL;
2074
2075         mio->state = ACCEPTED;
2076         mio->err = 0;
2077         mio->cb = NULL;
2078         switch (pr->req->op) {
2079                 /* primary xseg operations of mapper */
2080                 case X_CLONE: action = handle_clone; break;
2081                 case X_MAPR: action = handle_mapr; break;
2082                 case X_MAPW: action = handle_mapw; break;
2083 //              case X_SNAPSHOT: handle_snap(peer, pr, req); break;
2084                 case X_INFO: action = handle_info; break;
2085                 case X_DELETE: action = handle_destroy; break;
2086                 case X_CLOSE: action = handle_close; break;
2087                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2088         }
2089         if (action){
2090                 ta++;
2091                 mio->active = 1;
2092                 st_thread_create(action, pr, 0, 0);
2093         }
2094         return 0;
2095
2096 }
2097
2098 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2099                 enum dispatch_reason reason)
2100 {
2101         struct mapperd *mapper = __get_mapperd(peer);
2102         (void) mapper;
2103         struct mapper_io *mio = __get_mapper_io(pr);
2104         (void) mio;
2105
2106
2107         if (reason == dispatch_accept)
2108                 dispatch_accepted(peer, pr, req);
2109         else {
2110                 if (mio->cb){
2111                         mio->cb(pr, req);
2112                 } else { 
2113                         signal_pr(pr);
2114                 }
2115         }
2116         return 0;
2117 }
2118
2119 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2120 {
2121         int i;
2122
2123         //FIXME error checks
2124         struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2125         peer->priv = mapperd;
2126         mapper = mapperd;
2127         mapper->hashmaps = xhash_new(3, STRING);
2128
2129         printf("%llu \n", MAX_VOLUME_SIZE);
2130         for (i = 0; i < peer->nr_ops; i++) {
2131                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2132                 mio->copyups_nodes = xhash_new(3, INTEGER);
2133                 mio->copyups = 0;
2134                 mio->err = 0;
2135                 mio->active = 0;
2136                 peer->peer_reqs[i].priv = mio;
2137         }
2138
2139         for (i = 0; i < argc; i++) {
2140                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
2141                         mapper->bportno = atoi(argv[i+1]);
2142                         i += 1;
2143                         continue;
2144                 }
2145                 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
2146                         mapper->mbportno = atoi(argv[i+1]);
2147                         i += 1;
2148                         continue;
2149                 }
2150                 /* enforce only one thread */
2151                 if (!strcmp(argv[i], "-t") && (i+1) < argc){
2152                         int t = atoi(argv[i+1]);
2153                         if (t != 1) {
2154                                 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
2155                                 return -1;
2156                         }
2157                         i += 1;
2158                         continue;
2159                 }
2160         }
2161
2162         const struct sched_param param = { .sched_priority = 99 };
2163         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
2164
2165
2166 //      test_map(peer);
2167
2168         return 0;
2169 }
2170
2171 /* FIXME this should not be here */
2172 int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
2173 {
2174         struct xseg *xseg = peer->xseg;
2175         xport portno_start = peer->portno_start;
2176         xport portno_end = peer->portno_end;
2177         struct peer_req *pr;
2178         xport i;
2179         int  r, c = 0;
2180         struct xseg_request *req, *received;
2181         xseg_prepare_wait(xseg, portno_start);
2182         while(1) {
2183                 XSEGLOG2(&lc, D, "Attempting to check for reply");
2184                 c = 1;
2185                 while (c){
2186                         c = 0;
2187                         for (i = portno_start; i <= portno_end; i++) {
2188                                 received = xseg_receive(xseg, i, 0);
2189                                 if (received) {
2190                                         c = 1;
2191                                         r =  xseg_get_req_data(xseg, received, (void **) &pr);
2192                                         if (r < 0 || !pr || received != expected_req){
2193                                                 XSEGLOG2(&lc, W, "Received request with no pr data\n");
2194                                                 xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
2195                                                 if (p == NoPort){
2196                                                         XSEGLOG2(&lc, W, "Could not respond stale request");
2197                                                         xseg_put_request(xseg, received, portno_start);
2198                                                         continue;
2199                                                 } else {
2200                                                         xseg_signal(xseg, p);
2201                                                 }
2202                                         } else {
2203                                                 xseg_cancel_wait(xseg, portno_start);
2204                                                 return 0;
2205                                         }
2206                                 }
2207                         }
2208                 }
2209                 xseg_wait_signal(xseg, 1000000UL);
2210         }
2211 }
2212
2213
2214 void custom_peer_finalize(struct peerd *peer)
2215 {
2216         struct mapperd *mapper = __get_mapperd(peer);
2217         struct peer_req *pr = alloc_peer_req(peer);
2218         if (!pr){
2219                 XSEGLOG2(&lc, E, "Cannot get peer request");
2220                 return;
2221         }
2222         int r;
2223         struct map *map;
2224         struct xseg_request *req;
2225         xhash_iter_t it;
2226         xhashidx key, val;
2227         xhash_iter_init(mapper->hashmaps, &it);
2228         while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
2229                 map = (struct map *)val;
2230                 if (!(map->flags & MF_MAP_EXCLUSIVE))
2231                         continue;
2232                 map->flags |= MF_MAP_CLOSING;
2233                 req = __close_map(pr, map);
2234                 if (!req)
2235                         continue;
2236                 wait_reply(peer, req);
2237                 if (!(req->state & XS_SERVED))
2238                         XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
2239                 map->flags &= ~MF_MAP_CLOSING;
2240                 xseg_put_request(peer->xseg, req, pr->portno);
2241         }
2242         return;
2243
2244
2245 }
2246
2247 void print_obj(struct map_node *mn)
2248 {
2249         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2250                         (unsigned long long) mn->objectidx, mn->object, 
2251                         (unsigned int) mn->objectlen, 
2252                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2253 }
2254
2255 void print_map(struct map *m)
2256 {
2257         uint64_t nr_objs = m->size/block_size;
2258         if (m->size % block_size)
2259                 nr_objs++;
2260         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n", 
2261                         m->volume, m->volumelen, 
2262                         (unsigned long long) m->size, 
2263                         (unsigned long long) nr_objs,
2264                         m->version);
2265         uint64_t i;
2266         struct map_node *mn;
2267         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2268                 return;
2269         for (i = 0; i < nr_objs; i++) {
2270                 mn = find_object(m, i);
2271                 if (!mn){
2272                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
2273                         continue;
2274                 }
2275                 print_obj(mn);
2276         }
2277 }
2278
2279 /*
2280 void test_map(struct peerd *peer)
2281 {
2282         int i,j, ret;
2283         //struct sha256_ctx sha256ctx;
2284         unsigned char buf[SHA256_DIGEST_SIZE];
2285         char buf_new[XSEG_MAX_TARGETLEN + 20];
2286         struct map *m = malloc(sizeof(struct map));
2287         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2288         m->volume[XSEG_MAX_TARGETLEN] = 0;
2289         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2290         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2291         m->volumelen = XSEG_MAX_TARGETLEN;
2292         m->size = 100*block_size;
2293         m->objects = xhash_new(3, INTEGER);
2294         struct map_node *map_node = calloc(100, sizeof(struct map_node));
2295         for (i = 0; i < 100; i++) {
2296                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2297                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2298                 
2299                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2300                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2301                 }
2302                 map_node[i].objectidx = i;
2303                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2304                 map_node[i].flags = MF_OBJECT_EXIST;
2305                 ret = insert_object(m, &map_node[i]);
2306         }
2307
2308         char *data = malloc(block_size);
2309         mapheader_to_map(m, data);
2310         uint64_t pos = mapheader_size;
2311
2312         for (i = 0; i < 100; i++) {
2313                 map_node = find_object(m, i);
2314                 if (!map_node){
2315                         printf("no object node %d \n", i);
2316                         exit(1);
2317                 }
2318                 object_to_map(data+pos, map_node);
2319                 pos += objectsize_in_map;
2320         }
2321 //      print_map(m);
2322
2323         struct map *m2 = malloc(sizeof(struct map));
2324         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2325         m->volume[XSEG_MAX_TARGETLEN] = 0;
2326         m->volumelen = XSEG_MAX_TARGETLEN;
2327
2328         m2->objects = xhash_new(3, INTEGER);
2329         ret = read_map(peer, m2, data);
2330 //      print_map(m2);
2331
2332         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2333         ssize_t r, sum = 0;
2334         while (sum < block_size) {
2335                 r = write(fd, data + sum, block_size -sum);
2336                 if (r < 0){
2337                         perror("write");
2338                         printf("write error\n");
2339                         exit(1);
2340                 } 
2341                 sum += r;
2342         }
2343         close(fd);
2344         map_node = find_object(m, 0);
2345         free(map_node);
2346         free(m);
2347 }
2348 */