add versions to maps
[archipelago] / xseg / peers / user / mt-mapperd.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <pthread.h>
5 #include <xseg/xseg.h>
6 #include <peer.h>
7 #include <time.h>
8 #include <xtypes/xlock.h>
9 #include <xtypes/xhash.h>
10 #include <xseg/protocol.h>
11 #include <sys/stat.h>
12 #include <fcntl.h>
13 #include <errno.h>
14 #include <sched.h>
15 #include <sys/syscall.h>
16 #include <openssl/sha.h>
17
18 /* general mapper flags */
19 #define MF_LOAD         (1 << 0)
20 #define MF_EXCLUSIVE    (1 << 1)
21 #define MF_FORCE        (1 << 2)
22 #define MF_ARCHIP       (1 << 3)
23
24 #ifndef SHA256_DIGEST_SIZE
25 #define SHA256_DIGEST_SIZE 32
26 #endif
27 /* hex representation of sha256 value takes up double the sha256 size */
28 #define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
29
30 #define block_size (1<<22) //FIXME this should be defined here?
31
32 /* transparency byte + max object len in disk */
33 #define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
34
35 /* Map header contains:
36  *      map version
37  *      volume size
38  */
39 #define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
40
41
42 #define MAPPER_PREFIX "archip_"
43 #define MAPPER_PREFIX_LEN 7
44
45 #define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
46 #define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
47
48 #if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
49 #error  "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
50 #endif
51
52 #define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
53
54 #if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
55 #error  "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
56 #endif
57
58 #define MAX_VOLUME_SIZE \
59 ((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
60
61
62 char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
63 #define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
64
65 /* dispatch_internal mapper states */
66 enum mapper_state {
67         ACCEPTED = 0,
68         WRITING = 1,
69         COPYING = 2,
70         DELETING = 3,
71         DROPPING_CACHE = 4
72 };
73
74 typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
75
76
77 /* mapper object flags */
78 #define MF_OBJECT_EXIST         (1 << 0)
79 #define MF_OBJECT_COPYING       (1 << 1)
80 #define MF_OBJECT_WRITING       (1 << 2)
81 #define MF_OBJECT_DELETING      (1 << 3)
82 #define MF_OBJECT_DELETED       (1 << 4)
83 #define MF_OBJECT_DESTROYED     (1 << 5)
84
85 #define MF_OBJECT_NOT_READY     (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
86                                         MF_OBJECT_DELETING)
87 struct map_node {
88         uint32_t flags;
89         uint32_t objectidx;
90         uint32_t objectlen;
91         char object[MAX_OBJECT_LEN + 1];        /* NULL terminated string */
92         struct map *map;
93         uint32_t ref;
94         uint32_t waiters;
95         st_cond_t cond;
96 };
97
98
99 #define wait_on_pr(__pr, __condition__)         \
100         while (__condition__){                  \
101                 ta--;                           \
102                 __get_mapper_io(pr)->active = 0;\
103                 XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
104                 st_cond_wait(__pr->cond);       \
105         }
106
107 #define wait_on_mapnode(__mn, __condition__)    \
108         while (__condition__){                  \
109                 ta--;                           \
110                 __mn->waiters++;                \
111                 XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
112                         ta: %u",  __mn, __mn->object, __mn->waiters, ta);  \
113                 st_cond_wait(__mn->cond);       \
114         }
115
116 #define wait_on_map(__map, __condition__)       \
117         while (__condition__){                  \
118                 ta--;                           \
119                 __map->waiters++;               \
120                 XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
121                                    __map, __map->volume, __map->waiters, ta); \
122                 st_cond_wait(__map->cond);      \
123         }
124
125 #define signal_pr(__pr)                         \
126         do {                                    \
127                 if (!__get_mapper_io(pr)->active){\
128                         ta++;                   \
129                         XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta);\
130                         __get_mapper_io(pr)->active = 1;\
131                         st_cond_signal(__pr->cond);     \
132                 }                               \
133         }while(0)
134
135 #define signal_map(__map)                       \
136         do {                                    \
137                 if (__map->waiters) {           \
138                         ta += 1;                \
139                         XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
140                         ta: %u",  __map, __map->volume, __map->waiters, ta); \
141                         __map->waiters--;       \
142                         st_cond_signal(__map->cond);    \
143                 }                               \
144         }while(0)
145
146 #define signal_mapnode(__mn)                    \
147         do {                                    \
148                 if (__mn->waiters) {            \
149                         ta += __mn->waiters;    \
150                         XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
151                         %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
152                         __mn->waiters = 0;      \
153                         st_cond_broadcast(__mn->cond);  \
154                 }                               \
155         }while(0)
156
157
158 /* map flags */
159 #define MF_MAP_LOADING          (1 << 0)
160 #define MF_MAP_DESTROYED        (1 << 1)
161 #define MF_MAP_WRITING          (1 << 2)
162 #define MF_MAP_DELETING         (1 << 3)
163 #define MF_MAP_DROPPING_CACHE   (1 << 4)
164 #define MF_MAP_EXCLUSIVE        (1 << 5)
165 #define MF_MAP_OPENING          (1 << 6)
166 #define MF_MAP_CLOSING          (1 << 7)
167
168 #define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
169                                         MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
170
171 struct map {
172         uint32_t version;
173         uint32_t flags;
174         uint64_t size;
175         uint32_t volumelen;
176         char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
177         xhash_t *objects;       /* obj_index --> map_node */
178         uint32_t ref;
179         uint32_t waiters;
180         st_cond_t cond;
181 };
182
183 struct mapperd {
184         xport bportno;          /* blocker that accesses data */
185         xport mbportno;         /* blocker that accesses maps */
186         xhash_t *hashmaps; // hash_function(target) --> struct map
187 };
188
189 struct mapper_io {
190         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
191         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
192         struct map_node *copyup_node;
193         volatile int err;                       /* error flag */
194         volatile uint64_t del_pending;
195         uint64_t delobj;
196         uint64_t dcobj;
197         cb_t cb;
198         enum mapper_state state;
199         volatile int active;
200 };
201
202 /* global vars */
203 struct mapperd *mapper;
204
205 void print_map(struct map *m);
206
207
208 void custom_peer_usage()
209 {
210         fprintf(stderr, "Custom peer options: \n"
211                         "-bp  : port for block blocker(!)\n"
212                         "-mbp : port for map blocker\n"
213                         "\n");
214 }
215
216
217 /*
218  * Helper functions
219  */
220
221 static inline struct mapperd * __get_mapperd(struct peerd *peer)
222 {
223         return (struct mapperd *) peer->priv;
224 }
225
226 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
227 {
228         return (struct mapper_io *) pr->priv;
229 }
230
231 static inline uint64_t calc_map_obj(struct map *map)
232 {
233         if (map->size == -1)
234                 return 0;
235         uint64_t nr_objs = map->size / block_size;
236         if (map->size % block_size)
237                 nr_objs++;
238         return nr_objs;
239 }
240
241 static uint32_t calc_nr_obj(struct xseg_request *req)
242 {
243         unsigned int r = 1;
244         uint64_t rem_size = req->size;
245         uint64_t obj_offset = req->offset & (block_size -1); //modulo
246         uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
247         rem_size -= obj_size;
248         while (rem_size > 0) {
249                 obj_size = (rem_size > block_size) ? block_size : rem_size;
250                 rem_size -= obj_size;
251                 r++;
252         }
253
254         return r;
255 }
256
257 /* hexlify function. 
258  * Unsafe. Doesn't check if data length is odd!
259  */
260 static void hexlify(unsigned char *data, char *hex)
261 {
262         int i;
263         for (i=0; i<SHA256_DIGEST_LENGTH; i++)
264                 sprintf(hex+2*i, "%02x", data[i]);
265 }
266
267 static void unhexlify(char *hex, unsigned char *data)
268 {
269         int i;
270         char c;
271         for (i=0; i<SHA256_DIGEST_LENGTH; i++){
272                 data[i] = 0;
273                 c = hex[2*i];
274                 if (isxdigit(c)){
275                         if (isdigit(c)){
276                                 c-= '0';
277                         }
278                         else {
279                                 c = tolower(c);
280                                 c = c-'a' + 10;
281                         }
282                 }
283                 else {
284                         c = 0;
285                 }
286                 data[i] |= (c << 4) & 0xF0;
287                 c = hex[2*i+1];
288                 if (isxdigit(c)){
289                         if (isdigit(c)){
290                                 c-= '0';
291                         }
292                         else {
293                                 c = tolower(c);
294                                 c = c-'a' + 10;
295                         }
296                 }
297                 else {
298                         c = 0;
299                 }
300                 data[i] |= c & 0x0F;
301         }
302 }
303 /*
304  * Maps handling functions
305  */
306
307 static struct map * find_map(struct mapperd *mapper, char *volume)
308 {
309         struct map *m = NULL;
310         int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
311                                 (xhashidx *) &m);
312         if (r < 0)
313                 return NULL;
314         return m;
315 }
316
317 static struct map * find_map_len(struct mapperd *mapper, char *target,
318                                         uint32_t targetlen, uint32_t flags)
319 {
320         char buf[XSEG_MAX_TARGETLEN+1];
321         if (flags & MF_ARCHIP){
322                 strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
323                 strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
324                 buf[MAPPER_PREFIX_LEN + targetlen] = 0;
325                 targetlen += MAPPER_PREFIX_LEN;
326         }
327         else {
328                 strncpy(buf, target, targetlen);
329                 buf[targetlen] = 0;
330         }
331
332         if (targetlen > MAX_VOLUME_LEN){
333                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
334                                         targetlen, MAX_VOLUME_LEN);
335                 return NULL;
336         }
337
338         XSEGLOG2(&lc, D, "looking up map %s, len %u",
339                         buf, targetlen);
340         return find_map(mapper, buf);
341 }
342
343
344 static int insert_map(struct mapperd *mapper, struct map *map)
345 {
346         int r = -1;
347
348         if (find_map(mapper, map->volume)){
349                 XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
350                 goto out;
351         }
352
353         XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
354                         map->volume, strlen(map->volume), (unsigned long) map);
355         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
356         while (r == -XHASH_ERESIZE) {
357                 xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
358                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
359                 if (!new_hashmap){
360                         XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
361                                         (unsigned long long) shift);
362                         goto out;
363                 }
364                 mapper->hashmaps = new_hashmap;
365                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
366         }
367 out:
368         return r;
369 }
370
371 static int remove_map(struct mapperd *mapper, struct map *map)
372 {
373         int r = -1;
374
375         //assert no pending pr on map
376
377         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
378         while (r == -XHASH_ERESIZE) {
379                 xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
380                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
381                 if (!new_hashmap){
382                         XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
383                                         (unsigned long long) shift);
384                         goto out;
385                 }
386                 mapper->hashmaps = new_hashmap;
387                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
388         }
389 out:
390         return r;
391 }
392
393 static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
394 {
395         int r;
396         xport p;
397         struct peerd *peer = pr->peer;
398         struct xseg_request *req;
399         struct mapperd *mapper = __get_mapperd(peer);
400         void *dummy;
401
402         XSEGLOG2(&lc, I, "Closing map %s", map->volume);
403
404         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
405         if (!req){
406                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
407                                 map->volume);
408                 goto out_err;
409         }
410
411         r = xseg_prep_request(peer->xseg, req, map->volumelen, block_size);
412         if (r < 0){
413                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
414                                 map->volume);
415                 goto out_put;
416         }
417
418         char *reqtarget = xseg_get_target(peer->xseg, req);
419         if (!reqtarget)
420                 goto out_put;
421         strncpy(reqtarget, map->volume, req->targetlen);
422         req->op = X_CLOSE;
423         req->size = block_size;
424         req->offset = 0;
425         r = xseg_set_req_data(peer->xseg, req, pr);
426         if (r < 0){
427                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
428                                 map->volume);
429                 goto out_put;
430         }
431         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
432         if (p == NoPort){
433                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
434                                 map->volume);
435                 goto out_unset;
436         }
437         r = xseg_signal(peer->xseg, p);
438
439         XSEGLOG2(&lc, I, "Map %s closing", map->volume);
440         return req;
441
442 out_unset:
443         xseg_get_req_data(peer->xseg, req, &dummy);
444 out_put:
445         xseg_put_request(peer->xseg, req, pr->portno);
446 out_err:
447         return NULL;
448 }
449
450 static int close_map(struct peer_req *pr, struct map *map)
451 {
452         int err;
453         struct xseg_request *req;
454         struct peerd *peer = pr->peer;
455
456         map->flags |= MF_MAP_CLOSING;
457         req = __close_map(pr, map);
458         if (!req)
459                 return -1;
460         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
461         map->flags &= ~MF_MAP_CLOSING;
462         err = req->state & XS_FAILED;
463         xseg_put_request(peer->xseg, req, pr->portno);
464         if (err)
465                 return -1;
466         return 0;
467 }
468
469 /*
470 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
471                                 char *target, uint32_t targetlen, struct map **m)
472 {
473         struct mapperd *mapper = __get_mapperd(peer);
474         int r;
475         *m = find_map(mapper, target, targetlen);
476         if (*m) {
477                 XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
478                 if ((*m)->flags & MF_MAP_NOT_READY) {
479                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
480                         XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
481                         return MF_PENDING;
482                 //} else if ((*m)->flags & MF_MAP_DESTROYED){
483                 //      return -1;
484                 // 
485                 }else {
486                         XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
487                         return 0;
488                 }
489         }
490         r = open_map(peer, pr, target, targetlen, 0);
491         if (r < 0)
492                 return -1; //error
493         return MF_PENDING;      
494 }
495 */
496 /*
497  * Object handling functions
498  */
499
500 struct map_node *find_object(struct map *map, uint64_t obj_index)
501 {
502         struct map_node *mn;
503         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
504         if (r < 0)
505                 return NULL;
506         return mn;
507 }
508
509 static int insert_object(struct map *map, struct map_node *mn)
510 {
511         //FIXME no find object first
512         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
513         if (r == -XHASH_ERESIZE) {
514                 unsigned long shift = xhash_grow_size_shift(map->objects);
515                 map->objects = xhash_resize(map->objects, shift, NULL);
516                 if (!map->objects)
517                         return -1;
518                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
519         }
520         return r;
521 }
522
523
524 /*
525  * map read/write functions 
526  */
527 static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
528 {
529         hexlify(buf, mn->object);
530         mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
531         mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
532         mn->flags = MF_OBJECT_EXIST;
533 }
534
535 static inline void map_to_object(struct map_node *mn, unsigned char *buf)
536 {
537         char c = buf[0];
538         mn->flags = 0;
539         if (c){
540                 mn->flags |= MF_OBJECT_EXIST;
541                 strcpy(mn->object, MAPPER_PREFIX);
542                 hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
543                 mn->object[MAX_OBJECT_LEN] = 0;
544                 mn->objectlen = strlen(mn->object);
545         }
546         else {
547                 hexlify(buf+1, mn->object);
548                 mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
549                 mn->objectlen = strlen(mn->object);
550         }
551
552 }
553
554 static inline void object_to_map(char* buf, struct map_node *mn)
555 {
556         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
557         if (buf[0]){
558                 /* strip common prefix */
559                 unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
560         }
561         else {
562                 unhexlify(mn->object, (unsigned char *)(buf+1));
563         }
564 }
565
566 static inline void mapheader_to_map(struct map *m, char *buf)
567 {
568         uint64_t pos = 0;
569         memcpy(buf + pos, &m->version, sizeof(m->version));
570         pos += sizeof(m->version);
571         memcpy(buf + pos, &m->size, sizeof(m->size));
572         pos += sizeof(m->size);
573 }
574
575
576 static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
577                                 struct map *map, struct map_node *mn)
578 {
579         void *dummy;
580         struct mapperd *mapper = __get_mapperd(peer);
581         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
582                                                         mapper->mbportno, X_ALLOC);
583         if (!req){
584                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
585                                 "(Map: %s [%llu]",
586                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
587                 goto out_err;
588         }
589         int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
590         if (r < 0){
591                 XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
592                                 "(Map: %s [%llu]",
593                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
594                 goto out_put;
595         }
596         char *target = xseg_get_target(peer->xseg, req);
597         strncpy(target, map->volume, req->targetlen);
598         req->size = objectsize_in_map;
599         req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
600         req->op = X_WRITE;
601         char *data = xseg_get_data(peer->xseg, req);
602         object_to_map(data, mn);
603
604         r = xseg_set_req_data(peer->xseg, req, pr);
605         if (r < 0){
606                 XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
607                                 "(Map: %s [%llu]",
608                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
609                 goto out_put;
610         }
611         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
612         if (p == NoPort){
613                 XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
614                                 "(Map: %s [%llu]",
615                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
616                 goto out_unset;
617         }
618         r = xseg_signal(peer->xseg, p);
619         if (r < 0)
620                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
621
622         XSEGLOG2(&lc, I, "Writing object %s \n\t"
623                         "Map: %s [%llu]",
624                         mn->object, map->volume, (unsigned long long) mn->objectidx);
625
626         return req;
627
628 out_unset:
629         xseg_get_req_data(peer->xseg, req, &dummy);
630 out_put:
631         xseg_put_request(peer->xseg, req, pr->portno);
632 out_err:
633         XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
634                         "(Map: %s [%llu]",
635                         mn->object, map->volume, (unsigned long long) mn->objectidx);
636         return NULL;
637 }
638
639 static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
640 {
641         void *dummy;
642         struct peerd *peer = pr->peer;
643         struct mapperd *mapper = __get_mapperd(peer);
644         struct map_node *mn;
645         uint64_t i, pos, max_objidx = calc_map_obj(map);
646         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
647                                                         mapper->mbportno, X_ALLOC);
648         if (!req){
649                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
650                 goto out_err;
651         }
652         int r = xseg_prep_request(peer->xseg, req, map->volumelen,
653                         mapheader_size + max_objidx * objectsize_in_map);
654         if (r < 0){
655                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
656                 goto out_put;
657         }
658         char *target = xseg_get_target(peer->xseg, req);
659         strncpy(target, map->volume, req->targetlen);
660         char *data = xseg_get_data(peer->xseg, req);
661         mapheader_to_map(map, data);
662         pos = mapheader_size;
663         req->op = X_WRITE;
664         req->size = req->datalen;
665         req->offset = 0;
666
667         if (map->size % block_size)
668                 max_objidx++;
669         for (i = 0; i < max_objidx; i++) {
670                 mn = find_object(map, i);
671                 if (!mn){
672                         XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
673                                         (unsigned long long) i, map->volume);
674                         goto out_put;
675                 }
676                 object_to_map(data+pos, mn);
677                 pos += objectsize_in_map;
678         }
679         r = xseg_set_req_data(peer->xseg, req, pr);
680         if (r < 0){
681                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
682                                 map->volume);
683                 goto out_put;
684         }
685         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
686         if (p == NoPort){
687                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
688                                 map->volume);
689                 goto out_unset;
690         }
691         r = xseg_signal(peer->xseg, p);
692         if (r < 0)
693                 XSEGLOG2(&lc, W, "Cannot signal port %u", p);
694
695         map->flags |= MF_MAP_WRITING;
696         XSEGLOG2(&lc, I, "Writing map %s", map->volume);
697         return req;
698
699 out_unset:
700         xseg_get_req_data(peer->xseg, req, &dummy);
701 out_put:
702         xseg_put_request(peer->xseg, req, pr->portno);
703 out_err:
704         XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
705         return NULL;
706 }
707
708 static int write_map(struct peer_req* pr, struct map *map)
709 {
710         int r = 0;
711         struct peerd *peer = pr->peer;
712         map->flags |= MF_MAP_WRITING;
713         struct xseg_request *req = __write_map(pr, map);
714         if (!req)
715                 return -1;
716         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
717         if (req->state & XS_FAILED)
718                 r = -1;
719         xseg_put_request(peer->xseg, req, pr->portno);
720         map->flags &= ~MF_MAP_WRITING;
721         return r;
722 }
723
724 static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
725 {
726         int r;
727         xport p;
728         struct xseg_request *req;
729         struct peerd *peer = pr->peer;
730         struct mapperd *mapper = __get_mapperd(peer);
731         void *dummy;
732
733         XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
734
735         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
736         if (!req){
737                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
738                                 m->volume);
739                 goto out_fail;
740         }
741
742         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
743         if (r < 0){
744                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
745                                 m->volume);
746                 goto out_put;
747         }
748
749         char *reqtarget = xseg_get_target(peer->xseg, req);
750         if (!reqtarget)
751                 goto out_put;
752         strncpy(reqtarget, m->volume, req->targetlen);
753         req->op = X_READ;
754         req->size = block_size;
755         req->offset = 0;
756         r = xseg_set_req_data(peer->xseg, req, pr);
757         if (r < 0){
758                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
759                                 m->volume);
760                 goto out_put;
761         }
762         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
763         if (p == NoPort){ 
764                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
765                                 m->volume);
766                 goto out_unset;
767         }
768         r = xseg_signal(peer->xseg, p);
769         
770         XSEGLOG2(&lc, I, "Map %s loading", m->volume);
771         return req;
772
773 out_unset:
774         xseg_get_req_data(peer->xseg, req, &dummy);
775 out_put:
776         xseg_put_request(peer->xseg, req, pr->portno);
777 out_fail:
778         return NULL;
779 }
780
781 static int read_map (struct map *map, unsigned char *buf)
782 {
783         char nulls[SHA256_DIGEST_SIZE];
784         memset(nulls, 0, SHA256_DIGEST_SIZE);
785
786         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
787         if (r) {
788                 XSEGLOG2(&lc, D, "Read zeros");
789                 //read error;
790                 return -1;
791         }
792         //type 1, archip type, type 0 pithos map
793         int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
794         XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
795         uint64_t pos;
796         uint64_t i, nr_objs;
797         struct map_node *map_node;
798         if (type) {
799                 uint64_t pos = 0;
800                 map->version = *(uint32_t *) (buf + pos);
801                 pos += sizeof(uint32_t);
802                 map->size = *(uint64_t *) (buf + pos);
803                 pos += sizeof(uint64_t);
804                 nr_objs = map->size / block_size;
805                 if (map->size % block_size)
806                         nr_objs++;
807                 map_node = calloc(nr_objs, sizeof(struct map_node));
808                 if (!map_node)
809                         return -1;
810
811                 for (i = 0; i < nr_objs; i++) {
812                         map_node[i].map = map;
813                         map_node[i].objectidx = i;
814                         map_node[i].waiters = 0;
815                         map_node[i].ref = 1;
816                         map_node[i].cond = st_cond_new(); //FIXME err check;
817                         map_to_object(&map_node[i], buf + pos);
818                         pos += objectsize_in_map;
819                         r = insert_object(map, &map_node[i]); //FIXME error check
820                 }
821         } else {
822                 pos = 0;
823                 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
824                 map_node = calloc(max_nr_objs, sizeof(struct map_node));
825                 if (!map_node)
826                         return -1;
827                 for (i = 0; i < max_nr_objs; i++) {
828                         if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
829                                 break;
830                         map_node[i].objectidx = i;
831                         map_node[i].map = map;
832                         map_node[i].waiters = 0;
833                         map_node[i].ref = 1;
834                         map_node[i].cond = st_cond_new(); //FIXME err check;
835                         pithosmap_to_object(&map_node[i], buf + pos);
836                         pos += SHA256_DIGEST_SIZE;
837                         r = insert_object(map, &map_node[i]); //FIXME error check
838                 }
839                 map->size = i * block_size;
840         }
841         print_map(map);
842         XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
843         return 0;
844
845         //FIXME cleanup on error
846 }
847
848 static int load_map(struct peer_req *pr, struct map *map)
849 {
850         int r = 0;
851         struct xseg_request *req;
852         struct peerd *peer = pr->peer;
853         map->flags |= MF_MAP_LOADING;
854         req = __load_map(pr, map);
855         if (!req)
856                 return -1;
857         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
858         map->flags &= ~MF_MAP_LOADING;
859         if (req->state & XS_FAILED){
860                 XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
861                 xseg_put_request(peer->xseg, req, pr->portno);
862                 return -1;
863         }
864         r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
865         xseg_put_request(peer->xseg, req, pr->portno);
866         return r;
867 }
868
869 static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
870                                                 uint32_t flags)
871 {
872         int r;
873         xport p;
874         struct xseg_request *req;
875         struct peerd *peer = pr->peer;
876         struct mapperd *mapper = __get_mapperd(peer);
877         void *dummy;
878
879         XSEGLOG2(&lc, I, "Opening map %s", m->volume);
880
881         req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
882         if (!req){
883                 XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
884                                 m->volume);
885                 goto out_fail;
886         }
887
888         r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
889         if (r < 0){
890                 XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
891                                 m->volume);
892                 goto out_put;
893         }
894
895         char *reqtarget = xseg_get_target(peer->xseg, req);
896         if (!reqtarget)
897                 goto out_put;
898         strncpy(reqtarget, m->volume, req->targetlen);
899         req->op = X_OPEN;
900         req->size = block_size;
901         req->offset = 0;
902         if (!(flags & MF_FORCE))
903                 req->flags = XF_NOSYNC;
904         r = xseg_set_req_data(peer->xseg, req, pr);
905         if (r < 0){
906                 XSEGLOG2(&lc, E, "Cannot set request data for map %s",
907                                 m->volume);
908                 goto out_put;
909         }
910         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
911         if (p == NoPort){ 
912                 XSEGLOG2(&lc, E, "Cannot submit request for map %s",
913                                 m->volume);
914                 goto out_unset;
915         }
916         r = xseg_signal(peer->xseg, p);
917         
918         XSEGLOG2(&lc, I, "Map %s opening", m->volume);
919         return req;
920
921 out_unset:
922         xseg_get_req_data(peer->xseg, req, &dummy);
923 out_put:
924         xseg_put_request(peer->xseg, req, pr->portno);
925 out_fail:
926         return NULL;
927 }
928
929 static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
930 {
931         int err;
932         struct xseg_request *req;
933         struct peerd *peer = pr->peer;
934
935         map->flags |= MF_MAP_OPENING;
936         req = __open_map(pr, map, flags);
937         if (!req)
938                 return -1;
939         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
940         map->flags &= ~MF_MAP_OPENING;
941         err = req->state & XS_FAILED;
942         xseg_put_request(peer->xseg, req, pr->portno);
943         if (err)
944                 return -1;
945         else 
946                 map->flags |= MF_MAP_EXCLUSIVE;
947         return 0;
948 }
949
950 /*
951  * copy up functions
952  */
953
954 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
955 {
956         int r = 0;
957         if (mn){
958                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
959                 if (r == -XHASH_ERESIZE) {
960                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
961                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
962                         if (!new_hashmap)
963                                 goto out;
964                         mio->copyups_nodes = new_hashmap;
965                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
966                 }
967         }
968         else {
969                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
970                 if (r == -XHASH_ERESIZE) {
971                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
972                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
973                         if (!new_hashmap)
974                                 goto out;
975                         mio->copyups_nodes = new_hashmap;
976                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
977                 }
978         }
979 out:
980         return r;
981 }
982
983 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
984 {
985         struct map_node *mn;
986         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
987         if (r < 0)
988                 return NULL;
989         return mn;
990 }
991
992 static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
993 {
994         struct mapperd *mapper = __get_mapperd(peer);
995         struct mapper_io *mio = __get_mapper_io(pr);
996         struct map *map = mn->map;
997         void *dummy;
998         int r = -1;
999         xport p;
1000
1001         uint32_t newtargetlen;
1002         char new_target[MAX_OBJECT_LEN + 1];
1003         unsigned char sha[SHA256_DIGEST_SIZE];
1004
1005         strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1006
1007         char tmp[XSEG_MAX_TARGETLEN + 1];
1008         uint32_t tmplen;
1009         strncpy(tmp, map->volume, map->volumelen);
1010         sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1011         tmp[XSEG_MAX_TARGETLEN] = 0;
1012         tmplen = strlen(tmp);
1013         SHA256((unsigned char *)tmp, tmplen, sha);
1014         hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1015         newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1016
1017
1018         if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1019                 goto copyup_zeroblock;
1020
1021         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1022                                                 mapper->bportno, X_ALLOC);
1023         if (!req){
1024                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1025                 goto out_err;
1026         }
1027         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
1028                                 sizeof(struct xseg_request_copy));
1029         if (r < 0){
1030                 XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1031                 goto out_put;
1032         }
1033
1034         char *target = xseg_get_target(peer->xseg, req);
1035         strncpy(target, new_target, req->targetlen);
1036
1037         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1038         strncpy(xcopy->target, mn->object, mn->objectlen);
1039         xcopy->targetlen = mn->objectlen;
1040
1041         req->offset = 0;
1042         req->size = block_size;
1043         req->op = X_COPY;
1044         r = xseg_set_req_data(peer->xseg, req, pr);
1045         if (r<0){
1046                 XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1047                 goto out_put;
1048         }
1049         r = __set_copyup_node(mio, req, mn);
1050         p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1051         if (p == NoPort) {
1052                 XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1053                 goto out_unset;
1054         }
1055         xseg_signal(peer->xseg, p);
1056 //      mio->copyups++;
1057
1058         mn->flags |= MF_OBJECT_COPYING;
1059         XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1060         return req;
1061
1062 out_unset:
1063         r = __set_copyup_node(mio, req, NULL);
1064         xseg_get_req_data(peer->xseg, req, &dummy);
1065 out_put:
1066         xseg_put_request(peer->xseg, req, pr->portno);
1067 out_err:
1068         XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1069         return NULL;
1070
1071 copyup_zeroblock:
1072         XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1073                         "Proceeding in writing the new object in map");
1074         /* construct a tmp map_node for writing purposes */
1075         struct map_node newmn = *mn;
1076         newmn.flags = MF_OBJECT_EXIST;
1077         strncpy(newmn.object, new_target, newtargetlen);
1078         newmn.object[newtargetlen] = 0;
1079         newmn.objectlen = newtargetlen;
1080         newmn.objectidx = mn->objectidx; 
1081         req = object_write(peer, pr, map, &newmn);
1082         r = __set_copyup_node(mio, req, mn);
1083         if (!req){
1084                 XSEGLOG2(&lc, E, "Object write returned error for object %s"
1085                                 "\n\t of map %s [%llu]",
1086                                 mn->object, map->volume, (unsigned long long) mn->objectidx);
1087                 return NULL;
1088         }
1089         mn->flags |= MF_OBJECT_WRITING;
1090         XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1091         return req;
1092 }
1093
1094 static struct xseg_request * delete_object(struct peer_req *pr, struct map_node *mn)
1095 {
1096         void *dummy;
1097         struct peerd *peer = pr->peer;
1098         struct mapperd *mapper = __get_mapperd(peer);
1099         struct mapper_io *mio = __get_mapper_io(pr);
1100         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1101                                                         mapper->bportno, X_ALLOC);
1102         XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1103         if (!req){
1104                 XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1105                 goto out_err;
1106         }
1107         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1108         if (r < 0){
1109                 XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1110                 goto out_put;
1111         }
1112         char *target = xseg_get_target(peer->xseg, req);
1113         strncpy(target, mn->object, req->targetlen);
1114         req->op = X_DELETE;
1115         req->size = req->datalen;
1116         req->offset = 0;
1117         r = xseg_set_req_data(peer->xseg, req, pr);
1118         if (r < 0){
1119                 XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1120                 goto out_put;
1121         }
1122         __set_copyup_node(mio, req, mn);
1123         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1124         if (p == NoPort){
1125                 XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1126                 goto out_unset;
1127         }
1128         r = xseg_signal(peer->xseg, p);
1129         XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1130         return req;
1131
1132 out_unset:
1133         xseg_get_req_data(peer->xseg, req, &dummy);
1134 out_put:
1135         xseg_put_request(peer->xseg, req, pr->portno);
1136 out_err:
1137         XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1138         return NULL;
1139 }
1140
1141 static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
1142 {
1143         void *dummy;
1144         struct peerd *peer = pr->peer;
1145         struct mapperd *mapper = __get_mapperd(peer);
1146         struct mapper_io *mio = __get_mapper_io(pr);
1147         struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1148                                                         mapper->mbportno, X_ALLOC);
1149         XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1150         if (!req){
1151                 XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1152                 goto out_err;
1153         }
1154         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1155         if (r < 0){
1156                 XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1157                 goto out_put;
1158         }
1159         char *target = xseg_get_target(peer->xseg, req);
1160         strncpy(target, map->volume, req->targetlen);
1161         req->op = X_DELETE;
1162         req->size = req->datalen;
1163         req->offset = 0;
1164         r = xseg_set_req_data(peer->xseg, req, pr);
1165         if (r < 0){
1166                 XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1167                 goto out_put;
1168         }
1169         __set_copyup_node(mio, req, NULL);
1170         xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1171         if (p == NoPort){
1172                 XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1173                 goto out_unset;
1174         }
1175         r = xseg_signal(peer->xseg, p);
1176         map->flags |= MF_MAP_DELETING;
1177         XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1178         return req;
1179
1180 out_unset:
1181         xseg_get_req_data(peer->xseg, req, &dummy);
1182 out_put:
1183         xseg_put_request(peer->xseg, req, pr->portno);
1184 out_err:
1185         XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1186         return  NULL;
1187 }
1188
1189
1190 static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1191 {
1192         struct map_node *mn = find_object(map, index);
1193         if (mn)
1194                 mn->ref++;
1195         return mn;
1196 }
1197
1198 static inline void put_mapnode(struct map_node *mn)
1199 {
1200         mn->ref--;
1201         if (!mn->ref){
1202                 //clean up mn
1203                 st_cond_destroy(mn->cond);
1204         }
1205 }
1206
1207 static inline void __get_map(struct map *map)
1208 {
1209         map->ref++;
1210 }
1211
1212 static inline void put_map(struct map *map)
1213 {
1214         struct map_node *mn;
1215         map->ref--;
1216         if (!map->ref){
1217                 XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1218                 //clean up map
1219                 uint64_t i;
1220                 for (i = 0; i < calc_map_obj(map); i++) {
1221                         mn = get_mapnode(map, i);
1222                         if (mn) {
1223                                 //make sure all pending operations on all objects are completed
1224                                 if (mn->flags & MF_OBJECT_NOT_READY){
1225                                         //this should never happen...
1226                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1227                                 }
1228                                 mn->flags &= MF_OBJECT_DESTROYED;
1229                                 put_mapnode(mn); //matchin mn->ref = 1 on mn init
1230                                 put_mapnode(mn); //matcing get_mapnode;
1231                                 //assert mn->ref == 0;
1232                         }
1233                 }
1234                 mn = find_object(map, 0);
1235                 if (mn)
1236                         free(mn);
1237                 XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1238                 free(map);
1239         }
1240 }
1241
1242 static struct map * create_map(struct mapperd *mapper, char *name,
1243                                 uint32_t namelen, uint32_t flags)
1244 {
1245         int r;
1246         if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1247                 XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1248                                         namelen, MAX_VOLUME_LEN);
1249                 return NULL;
1250         }
1251         struct map *m = malloc(sizeof(struct map));
1252         if (!m){
1253                 XSEGLOG2(&lc, E, "Cannot allocate map ");
1254                 goto out_err;
1255         }
1256         m->size = -1;
1257         if (flags & MF_ARCHIP){
1258                 strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1259                 strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1260                 m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1261                 m->volumelen = MAPPER_PREFIX_LEN + namelen;
1262                 m->version = 1; /* keep this hardcoded for now */
1263         }
1264         else {
1265                 strncpy(m->volume, name, namelen);
1266                 m->volume[namelen] = 0;
1267                 m->volumelen = namelen;
1268                 m->version = 0; /* version 0 should be pithos maps */
1269         }
1270         m->flags = 0;
1271         m->objects = xhash_new(3, INTEGER); 
1272         if (!m->objects){
1273                 XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1274                                 m->volume);
1275                 goto out_map;
1276         }
1277         m->ref = 1;
1278         m->waiters = 0;
1279         m->cond = st_cond_new(); //FIXME err check;
1280         r = insert_map(mapper, m);
1281         if (r < 0){
1282                 XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1283                 goto out_hash;
1284         }
1285
1286         return m;
1287
1288 out_hash:
1289         xhash_free(m->objects);
1290 out_map:
1291         XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1292         free(m);
1293 out_err:
1294         return NULL;
1295 }
1296
1297
1298
1299 void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1300 {
1301         struct peerd *peer = pr->peer;
1302         struct mapperd *mapper = __get_mapperd(peer);
1303         (void)mapper;
1304         struct mapper_io *mio = __get_mapper_io(pr);
1305         struct map_node *mn = __get_copyup_node(mio, req);
1306
1307         mio->del_pending--;
1308         if (req->state & XS_FAILED){
1309                 mio->err = 1;
1310         }
1311         signal_mapnode(mn);
1312         xseg_put_request(peer->xseg, req, pr->portno);
1313         signal_pr(pr);
1314 }
1315
1316 void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1317 {
1318         struct peerd *peer = pr->peer;
1319         struct mapperd *mapper = __get_mapperd(peer);
1320         (void)mapper;
1321         struct mapper_io *mio = __get_mapper_io(pr);
1322         struct map_node *mn = __get_copyup_node(mio, req);
1323         if (!mn){
1324                 XSEGLOG2(&lc, E, "Cannot get map node");
1325                 goto out_err;
1326         }
1327         __set_copyup_node(mio, req, NULL);
1328
1329         if (req->state & XS_FAILED){
1330                 XSEGLOG2(&lc, E, "Req failed");
1331                 mn->flags &= ~MF_OBJECT_COPYING;
1332                 mn->flags &= ~MF_OBJECT_WRITING;
1333                 goto out_err;
1334         }
1335         if (req->op == X_WRITE) {
1336                 char *target = xseg_get_target(peer->xseg, req);
1337                 (void)target;
1338                 //printf("handle object write replyi\n");
1339                 __set_copyup_node(mio, req, NULL);
1340                 //assert mn->flags & MF_OBJECT_WRITING
1341                 mn->flags &= ~MF_OBJECT_WRITING;
1342
1343                 struct map_node tmp;
1344                 char *data = xseg_get_data(peer->xseg, req);
1345                 map_to_object(&tmp, (unsigned char *) data);
1346                 mn->flags |= MF_OBJECT_EXIST;
1347                 if (mn->flags != MF_OBJECT_EXIST){
1348                         XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1349                         goto out_err;
1350                 }
1351                 //assert mn->flags & MF_OBJECT_EXIST
1352                 strncpy(mn->object, tmp.object, tmp.objectlen);
1353                 mn->object[tmp.objectlen] = 0;
1354                 mn->objectlen = tmp.objectlen;
1355                 XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1356                 mio->copyups--;
1357                 signal_mapnode(mn);
1358         } else if (req->op == X_COPY) {
1359         //      issue write_object;
1360                 mn->flags &= ~MF_OBJECT_COPYING;
1361                 struct map *map = mn->map;
1362                 if (!map){
1363                         XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1364                         goto out_err;
1365                 }
1366
1367                 /* construct a tmp map_node for writing purposes */
1368                 char *target = xseg_get_target(peer->xseg, req);
1369                 struct map_node newmn = *mn;
1370                 newmn.flags = MF_OBJECT_EXIST;
1371                 strncpy(newmn.object, target, req->targetlen);
1372                 newmn.object[req->targetlen] = 0;
1373                 newmn.objectlen = req->targetlen;
1374                 newmn.objectidx = mn->objectidx; 
1375                 struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1376                 if (!xreq){
1377                         XSEGLOG2(&lc, E, "Object write returned error for object %s"
1378                                         "\n\t of map %s [%llu]",
1379                                         mn->object, map->volume, (unsigned long long) mn->objectidx);
1380                         goto out_err;
1381                 }
1382                 mn->flags |= MF_OBJECT_WRITING;
1383                 __set_copyup_node (mio, xreq, mn);
1384
1385                 XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1386         } else {
1387                 //wtf??
1388                 ;
1389         }
1390
1391 out:
1392         xseg_put_request(peer->xseg, req, pr->portno);
1393         return;
1394
1395 out_err:
1396         mio->copyups--;
1397         XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1398         mio->err = 1;
1399         if (mn)
1400                 signal_mapnode(mn);
1401         goto out;
1402
1403 }
1404
1405 struct r2o {
1406         struct map_node *mn;
1407         uint64_t offset;
1408         uint64_t size;
1409 };
1410
1411 static int req2objs(struct peer_req *pr, struct map *map, int write)
1412 {
1413         int r = 0;
1414         struct peerd *peer = pr->peer;
1415         struct mapper_io *mio = __get_mapper_io(pr);
1416         char *target = xseg_get_target(peer->xseg, pr->req);
1417         uint32_t nr_objs = calc_nr_obj(pr->req);
1418         uint64_t size = sizeof(struct xseg_reply_map) + 
1419                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1420         uint32_t idx, i, ready;
1421         uint64_t rem_size, obj_index, obj_offset, obj_size; 
1422         struct map_node *mn;
1423         mio->copyups = 0;
1424         XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1425
1426         /* get map_nodes of request */
1427         struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1428         if (!mns){
1429                 XSEGLOG2(&lc, E, "Cannot allocate mns");
1430                 return -1;
1431         }
1432         idx = 0;
1433         rem_size = pr->req->size;
1434         obj_index = pr->req->offset / block_size;
1435         obj_offset = pr->req->offset & (block_size -1); //modulo
1436         obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1437         mn = get_mapnode(map, obj_index);
1438         if (!mn) {
1439                 XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1440                 r = -1;
1441                 goto out;
1442         }
1443         mns[idx].mn = mn;
1444         mns[idx].offset = obj_offset;
1445         mns[idx].size = obj_size;
1446         rem_size -= obj_size;
1447         while (rem_size > 0) {
1448                 idx++;
1449                 obj_index++;
1450                 obj_offset = 0;
1451                 obj_size = (rem_size >  block_size) ? block_size : rem_size;
1452                 rem_size -= obj_size;
1453                 mn = get_mapnode(map, obj_index);
1454                 if (!mn) {
1455                         XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1456                         r = -1;
1457                         goto out;
1458                 }
1459                 mns[idx].mn = mn;
1460                 mns[idx].offset = obj_offset;
1461                 mns[idx].size = obj_size;
1462         }
1463         if (write) {
1464                 ready = 0;
1465                 int can_wait = 0;
1466                 mio->cb=copyup_cb;
1467                 while (ready < (idx + 1)){
1468                         ready = 0;
1469                         for (i = 0; i < (idx+1); i++) {
1470                                 mn = mns[i].mn;
1471                                 //do copyups
1472                                 if (mn->flags & MF_OBJECT_NOT_READY) {
1473                                         if (can_wait){
1474                                                 wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1475                                                 if (mn->flags & MF_OBJECT_DELETED){
1476                                                         mio->err = 1;
1477                                                 }
1478                                                 if (mio->err){
1479                                                         XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1480                                                         if (!mio->copyups){
1481                                                                 r = -1;
1482                                                                 goto out;
1483                                                         }
1484                                                 }
1485                                         }
1486                                 }
1487                                 else if (!(mn->flags & MF_OBJECT_EXIST)) {
1488                                         //calc new_target, copy up object
1489                                         if (copyup_object(peer, mn, pr) == NULL){
1490                                                 XSEGLOG2(&lc, E, "Error in copy up object");
1491                                                 mio->err = 1;
1492                                         } else {
1493                                                 mio->copyups++;
1494                                         }
1495                                 } else {
1496                                         ready++;
1497                                 }
1498                         }
1499                         can_wait = 1;
1500                 }
1501                 /*
1502 pending_copyups:
1503                 while(mio->copyups > 0){
1504                         mio->cb = copyup_cb;
1505                         wait_on_pr(pr, 0);
1506                         ta--;
1507                         st_cond_wait(pr->cond);
1508                 }
1509                 */
1510         }
1511
1512         if (mio->err){
1513                 r = -1;
1514                 XSEGLOG2(&lc, E, "Mio->err");
1515                 goto out;
1516         }
1517
1518         /* resize request to fit reply */
1519         char buf[XSEG_MAX_TARGETLEN];
1520         strncpy(buf, target, pr->req->targetlen);
1521         r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1522         if (r < 0) {
1523                 XSEGLOG2(&lc, E, "Cannot resize request");
1524                 goto out;
1525         }
1526         target = xseg_get_target(peer->xseg, pr->req);
1527         strncpy(target, buf, pr->req->targetlen);
1528
1529         /* structure reply */
1530         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1531         reply->cnt = nr_objs;
1532         for (i = 0; i < (idx+1); i++) {
1533                 strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1534                 reply->segs[i].targetlen = mns[i].mn->objectlen;
1535                 reply->segs[i].offset = mns[i].offset;
1536                 reply->segs[i].size = mns[i].size;
1537         }
1538 out:
1539         for (i = 0; i < idx; i++) {
1540                 put_mapnode(mns[i].mn);
1541         }
1542         free(mns);
1543         return r;
1544 }
1545
1546 static int do_dropcache(struct peer_req *pr, struct map *map)
1547 {
1548         struct map_node *mn;
1549         struct peerd *peer = pr->peer;
1550         struct mapperd *mapper = __get_mapperd(peer);
1551         uint64_t i;
1552         XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1553         map->flags |= MF_MAP_DROPPING_CACHE;
1554         for (i = 0; i < calc_map_obj(map); i++) {
1555                 mn = get_mapnode(map, i);
1556                 if (mn) {
1557                         if (!(mn->flags & MF_OBJECT_DESTROYED)){
1558                                 //make sure all pending operations on all objects are completed
1559                                 if (mn->flags & MF_OBJECT_NOT_READY){
1560                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1561                                 }
1562                                 mn->flags &= MF_OBJECT_DESTROYED;
1563                         }
1564                         put_mapnode(mn);
1565                 }
1566         }
1567         map->flags &= ~MF_MAP_DROPPING_CACHE;
1568         map->flags |= MF_MAP_DESTROYED;
1569         remove_map(mapper, map);
1570         XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1571         put_map(map);   // put map here to destroy it (matches m->ref = 1 on map create)
1572         return 0;
1573 }
1574
1575 static int do_info(struct peer_req *pr, struct map *map)
1576 {
1577         struct peerd *peer = pr->peer;
1578         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1579         xinfo->size = map->size;
1580         return 0;
1581 }
1582
1583
1584 static int do_close(struct peer_req *pr, struct map *map)
1585 {
1586 //      struct peerd *peer = pr->peer;
1587 //      struct xseg_request *req;
1588         if (map->flags & MF_MAP_EXCLUSIVE) 
1589                 close_map(pr, map);
1590         return do_dropcache(pr, map);
1591 }
1592
1593 static int do_destroy(struct peer_req *pr, struct map *map)
1594 {
1595         uint64_t i;
1596         struct peerd *peer = pr->peer;
1597         struct mapper_io *mio = __get_mapper_io(pr);
1598         struct map_node *mn;
1599         struct xseg_request *req;
1600         
1601         XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1602         map->flags |= MF_MAP_DELETING;
1603         req = delete_map(pr, map);
1604         if (!req)
1605                 return -1;
1606         wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1607         if (req->state & XS_FAILED){
1608                 xseg_put_request(peer->xseg, req, pr->portno);
1609                 map->flags &= ~MF_MAP_DELETING;
1610                 return -1;
1611         }
1612         xseg_put_request(peer->xseg, req, pr->portno);
1613         //FIXME
1614         uint64_t nr_obj = calc_map_obj(map);
1615         uint64_t deleted = 0;
1616         while (deleted < nr_obj){ 
1617                 deleted = 0;
1618                 for (i = 0; i < nr_obj; i++){
1619                         mn = get_mapnode(map, i);
1620                         if (mn) {
1621                                 if (!(mn->flags & MF_OBJECT_DESTROYED)){
1622                                         if (mn->flags & MF_OBJECT_EXIST){
1623                                                 //make sure all pending operations on all objects are completed
1624                                                 if (mn->flags & MF_OBJECT_NOT_READY){
1625                                                         wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1626                                                 }
1627                                                 req = delete_object(pr, mn);
1628                                                 if (!req)
1629                                                         if (mio->del_pending){
1630                                                                 goto wait_pending;
1631                                                         } else {
1632                                                                 continue;
1633                                                         }
1634                                                 else {
1635                                                         mio->del_pending++;
1636                                                 }
1637                                         }
1638                                         mn->flags &= MF_OBJECT_DESTROYED;
1639                                 }
1640                                 put_mapnode(mn);
1641                         }
1642                         deleted++;
1643                 }
1644 wait_pending:
1645                 mio->cb = deletion_cb;
1646                 wait_on_pr(pr, mio->del_pending > 0);
1647         }
1648         mio->cb = NULL;
1649         map->flags &= ~MF_MAP_DELETING;
1650         XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1651         return do_close(pr, map);
1652 }
1653
1654 static int do_mapr(struct peer_req *pr, struct map *map)
1655 {
1656         struct peerd *peer = pr->peer;
1657         int r = req2objs(pr, map, 0);
1658         if  (r < 0){
1659                 XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1660                                 map->volume, 
1661                                 (unsigned long long) pr->req->offset, 
1662                                 (unsigned long long) (pr->req->offset + pr->req->size));
1663                 return -1;
1664         }
1665         XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1666                         map->volume, 
1667                         (unsigned long long) pr->req->offset, 
1668                         (unsigned long long) (pr->req->offset + pr->req->size));
1669         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1670                         (unsigned long long) pr->req->offset,
1671                         (unsigned long long) pr->req->size);
1672         char buf[XSEG_MAX_TARGETLEN+1];
1673         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1674         int i;
1675         for (i = 0; i < reply->cnt; i++) {
1676                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1677                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1678                 buf[reply->segs[i].targetlen] = 0;
1679                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1680                                 (unsigned long long) reply->segs[i].offset,
1681                                 (unsigned long long) reply->segs[i].size);
1682         }
1683         return 0;
1684 }
1685
1686 static int do_mapw(struct peer_req *pr, struct map *map)
1687 {
1688         struct peerd *peer = pr->peer;
1689         int r = req2objs(pr, map, 1);
1690         if  (r < 0){
1691                 XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1692                                 map->volume, 
1693                                 (unsigned long long) pr->req->offset, 
1694                                 (unsigned long long) (pr->req->offset + pr->req->size));
1695                 return -1;
1696         }
1697         XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1698                         map->volume, 
1699                         (unsigned long long) pr->req->offset, 
1700                         (unsigned long long) (pr->req->offset + pr->req->size));
1701         XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1702                         (unsigned long long) pr->req->offset,
1703                         (unsigned long long) pr->req->size);
1704         char buf[XSEG_MAX_TARGETLEN+1];
1705         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1706         int i;
1707         for (i = 0; i < reply->cnt; i++) {
1708                 XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1709                 strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1710                 buf[reply->segs[i].targetlen] = 0;
1711                 XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1712                                 (unsigned long long) reply->segs[i].offset,
1713                                 (unsigned long long) reply->segs[i].size);
1714         }
1715         return 0;
1716 }
1717
1718 //here map is the parent map
1719 static int do_clone(struct peer_req *pr, struct map *map)
1720 {
1721         /*
1722         FIXME check if clone map exists
1723         clonemap = get_map(pr, target, targetlen, MF_LOAD);
1724         if (clonemap)
1725                 do_dropcache(pr, clonemap); // drop map here, rely on get_map_function to drop
1726                                         //      cache on non-exclusive opens or declare a NO_CACHE flag ?
1727                 return -1;
1728         */
1729
1730         int r;
1731         char buf[XSEG_MAX_TARGETLEN];
1732         struct peerd *peer = pr->peer;
1733         struct mapperd *mapper = __get_mapperd(peer);
1734         char *target = xseg_get_target(peer->xseg, pr->req);
1735         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1736         XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1737         struct map *clonemap = create_map(mapper, target, pr->req->targetlen,
1738                                                 MF_ARCHIP);
1739         if (!clonemap) 
1740                 return -1;
1741
1742         if (xclone->size == -1)
1743                 clonemap->size = map->size;
1744         else
1745                 clonemap->size = xclone->size;
1746         if (clonemap->size < map->size){
1747                 target = xseg_get_target(peer->xseg, pr->req);
1748                 strncpy(buf, target, pr->req->targetlen);
1749                 buf[pr->req->targetlen] = 0;
1750                 XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1751                                 "\n\t for requested clone %s",
1752                                 (unsigned long long) xclone->size,
1753                                 (unsigned long long) map->size, buf);
1754                 goto out_err;
1755         }
1756
1757         //alloc and init map_nodes
1758         unsigned long c = clonemap->size/block_size + 1;
1759         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1760         if (!map_nodes){
1761                 goto out_err;
1762         }
1763         int i;
1764         for (i = 0; i < clonemap->size/block_size + 1; i++) {
1765                 struct map_node *mn = get_mapnode(map, i);
1766                 if (mn) {
1767                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1768                         map_nodes[i].objectlen = mn->objectlen;
1769                         put_mapnode(mn);
1770                 } else {
1771                         strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1772                         map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1773                 }
1774                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1775                 map_nodes[i].flags = 0;
1776                 map_nodes[i].objectidx = i;
1777                 map_nodes[i].map = clonemap;
1778                 map_nodes[i].ref = 1;
1779                 map_nodes[i].waiters = 0;
1780                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1781                 r = insert_object(clonemap, &map_nodes[i]);
1782                 if (r < 0){
1783                         XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1784                         goto out_err;
1785                 }
1786         }
1787         r = write_map(pr, clonemap);
1788         if (r < 0){
1789                 XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1790                 goto out_err;
1791         }
1792         return 0;
1793
1794 out_err:
1795         put_map(clonemap);
1796         return -1;
1797 }
1798
1799 static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1800 {
1801         int r, opened = 0;
1802         if (flags & MF_EXCLUSIVE){
1803                 r = open_map(pr, map, flags);
1804                 if (r < 0) {
1805                         if (flags & MF_FORCE){
1806                                 return -1;
1807                         }
1808                 } else {
1809                         opened = 1;
1810                 }
1811         }
1812         r = load_map(pr, map);
1813         if (r < 0 && opened){
1814                 close_map(pr, map);
1815         }
1816         return r;
1817 }
1818
1819 struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
1820                         uint32_t flags)
1821 {
1822         int r;
1823         struct peerd *peer = pr->peer;
1824         struct mapperd *mapper = __get_mapperd(peer);
1825         struct map *map = find_map_len(mapper, name, namelen, flags);
1826         if (!map){
1827                 if (flags & MF_LOAD){
1828                         map = create_map(mapper, name, namelen, flags);
1829                         if (!map)
1830                                 return NULL;
1831                         r = open_load_map(pr, map, flags);
1832                         if (r < 0){
1833                                 do_dropcache(pr, map);
1834                                 return NULL;
1835                         }
1836                 } else {
1837                         return NULL;
1838                 }
1839         } else if (map->flags & MF_MAP_DESTROYED){
1840                 return NULL;
1841         }
1842         __get_map(map);
1843         return map;
1844
1845 }
1846
1847 static int map_action(int (action)(struct peer_req *pr, struct map *map),
1848                 struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1849 {
1850         //struct peerd *peer = pr->peer;
1851         struct map *map;
1852 start:
1853         map = get_map(pr, name, namelen, flags);
1854         if (!map)
1855                 return -1;
1856         if (map->flags & MF_MAP_NOT_READY){
1857                 wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1858                 put_map(map);
1859                 goto start;
1860         }
1861         int r = action(pr, map);
1862         //always drop cache if map not read exclusively
1863         if (!(map->flags & MF_MAP_EXCLUSIVE))
1864                 do_dropcache(pr, map);
1865         signal_map(map);
1866         put_map(map);
1867         return r;
1868 }
1869
1870 void * handle_info(struct peer_req *pr)
1871 {
1872         struct peerd *peer = pr->peer;
1873         char *target = xseg_get_target(peer->xseg, pr->req);
1874         int r = map_action(do_info, pr, target, pr->req->targetlen,
1875                                 MF_ARCHIP|MF_LOAD);
1876         if (r < 0)
1877                 fail(peer, pr);
1878         else
1879                 complete(peer, pr);
1880         ta--;
1881         return NULL;
1882 }
1883
1884 void * handle_clone(struct peer_req *pr)
1885 {
1886         int r;
1887         struct peerd *peer = pr->peer;
1888         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1889         if (!xclone) {
1890                 r = -1;
1891                 goto out;
1892         }
1893         if (xclone->targetlen){
1894                 //support clone only from pithos
1895                 r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
1896                                         MF_LOAD);
1897         } else {
1898                 if (!xclone->size){
1899                         r = -1;
1900                 } else {
1901                         //FIXME
1902                         struct map *map;
1903                         char *target = xseg_get_target(peer->xseg, pr->req);
1904                         XSEGLOG2(&lc, I, "Creating volume");
1905                         map = get_map(pr, target, pr->req->targetlen,
1906                                                 MF_ARCHIP|MF_LOAD);
1907                         if (map){
1908                                 XSEGLOG2(&lc, E, "Volume %s exists", map->volume);
1909                                 if (map->ref <= 2) //initial one + one ref from __get_map
1910                                         do_dropcache(pr, map); //we are the only ones usining this map. Drop the cache. 
1911                                 put_map(map); //matches get_map
1912                                 r = -1;
1913                                 goto out;
1914                         }
1915                         //create a new empty map of size
1916                         map = create_map(mapper, target, pr->req->targetlen,
1917                                                 MF_ARCHIP);
1918                         if (!map){
1919                                 r = -1;
1920                                 goto out;
1921                         }
1922                         map->size = xclone->size;
1923                         //populate_map with zero objects;
1924                         uint64_t nr_objs = xclone->size / block_size;
1925                         if (xclone->size % block_size)
1926                                 nr_objs++;
1927
1928                         struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1929                         if (!map_nodes){
1930                                 do_dropcache(pr, map); //Since we just created the map, dropping cache should be sufficient.
1931                                 r = -1;
1932                                 goto out;
1933                         }
1934                         uint64_t i;
1935                         for (i = 0; i < nr_objs; i++) {
1936                                 strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1937                                 map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1938                                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1939                                 map_nodes[i].flags = 0;
1940                                 map_nodes[i].objectidx = i;
1941                                 map_nodes[i].map = map;
1942                                 map_nodes[i].ref = 1;
1943                                 map_nodes[i].waiters = 0;
1944                                 map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1945                                 r = insert_object(map, &map_nodes[i]);
1946                                 if (r < 0){
1947                                         do_dropcache(pr, map);
1948                                         r = -1;
1949                                         goto out;
1950                                 }
1951                         }
1952                         r = write_map(pr, map);
1953                         if (r < 0){
1954                                 XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1955                                 do_dropcache(pr, map);
1956                                 goto out;
1957                         }
1958                         XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1959                         r = 0;
1960                         do_dropcache(pr, map); //drop cache here for consistency
1961                 }
1962         }
1963 out:
1964         if (r < 0)
1965                 fail(peer, pr);
1966         else
1967                 complete(peer, pr);
1968         ta--;
1969         return NULL;
1970 }
1971
1972 void * handle_mapr(struct peer_req *pr)
1973 {
1974         struct peerd *peer = pr->peer;
1975         char *target = xseg_get_target(peer->xseg, pr->req);
1976         int r = map_action(do_mapr, pr, target, pr->req->targetlen,
1977                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
1978         if (r < 0)
1979                 fail(peer, pr);
1980         else
1981                 complete(peer, pr);
1982         ta--;
1983         return NULL;
1984 }
1985
1986 void * handle_mapw(struct peer_req *pr)
1987 {
1988         struct peerd *peer = pr->peer;
1989         char *target = xseg_get_target(peer->xseg, pr->req);
1990         int r = map_action(do_mapw, pr, target, pr->req->targetlen,
1991                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1992         if (r < 0)
1993                 fail(peer, pr);
1994         else
1995                 complete(peer, pr);
1996         XSEGLOG2(&lc, D, "Ta: %d", ta);
1997         ta--;
1998         return NULL;
1999 }
2000
2001 void * handle_destroy(struct peer_req *pr)
2002 {
2003         struct peerd *peer = pr->peer;
2004         char *target = xseg_get_target(peer->xseg, pr->req);
2005         int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2006                                 MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2007         if (r < 0)
2008                 fail(peer, pr);
2009         else
2010                 complete(peer, pr);
2011         ta--;
2012         return NULL;
2013 }
2014
2015 void * handle_close(struct peer_req *pr)
2016 {
2017         struct peerd *peer = pr->peer;
2018         char *target = xseg_get_target(peer->xseg, pr->req);
2019         //here we do not want to load
2020         int r = map_action(do_close, pr, target, pr->req->targetlen,
2021                                 MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2022         if (r < 0)
2023                 fail(peer, pr);
2024         else
2025                 complete(peer, pr);
2026         ta--;
2027         return NULL;
2028 }
2029
2030 int dispatch_accepted(struct peerd *peer, struct peer_req *pr, 
2031                         struct xseg_request *req)
2032 {
2033         //struct mapperd *mapper = __get_mapperd(peer);
2034         struct mapper_io *mio = __get_mapper_io(pr);
2035         void *(*action)(struct peer_req *) = NULL;
2036
2037         mio->state = ACCEPTED;
2038         mio->err = 0;
2039         mio->cb = NULL;
2040         switch (pr->req->op) {
2041                 /* primary xseg operations of mapper */
2042                 case X_CLONE: action = handle_clone; break;
2043                 case X_MAPR: action = handle_mapr; break;
2044                 case X_MAPW: action = handle_mapw; break;
2045 //              case X_SNAPSHOT: handle_snap(peer, pr, req); break;
2046                 case X_INFO: action = handle_info; break;
2047                 case X_DELETE: action = handle_destroy; break;
2048                 case X_CLOSE: action = handle_close; break;
2049                 default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2050         }
2051         if (action){
2052                 ta++;
2053                 mio->active = 1;
2054                 st_thread_create(action, pr, 0, 0);
2055         }
2056         return 0;
2057
2058 }
2059
2060 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2061                 enum dispatch_reason reason)
2062 {
2063         struct mapperd *mapper = __get_mapperd(peer);
2064         (void) mapper;
2065         struct mapper_io *mio = __get_mapper_io(pr);
2066         (void) mio;
2067
2068
2069         if (reason == dispatch_accept)
2070                 dispatch_accepted(peer, pr, req);
2071         else {
2072                 if (mio->cb){
2073                         mio->cb(pr, req);
2074                 } else { 
2075                         signal_pr(pr);
2076                 }
2077         }
2078         return 0;
2079 }
2080
2081 int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2082 {
2083         int i;
2084
2085         //FIXME error checks
2086         struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2087         peer->priv = mapperd;
2088         mapper = mapperd;
2089         mapper->hashmaps = xhash_new(3, STRING);
2090
2091         printf("%llu \n", MAX_VOLUME_SIZE);
2092         for (i = 0; i < peer->nr_ops; i++) {
2093                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2094                 mio->copyups_nodes = xhash_new(3, INTEGER);
2095                 mio->copyups = 0;
2096                 mio->err = 0;
2097                 mio->active = 0;
2098                 peer->peer_reqs[i].priv = mio;
2099         }
2100
2101         for (i = 0; i < argc; i++) {
2102                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
2103                         mapper->bportno = atoi(argv[i+1]);
2104                         i += 1;
2105                         continue;
2106                 }
2107                 if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
2108                         mapper->mbportno = atoi(argv[i+1]);
2109                         i += 1;
2110                         continue;
2111                 }
2112                 /* enforce only one thread */
2113                 if (!strcmp(argv[i], "-t") && (i+1) < argc){
2114                         int t = atoi(argv[i+1]);
2115                         if (t != 1) {
2116                                 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
2117                                 return -1;
2118                         }
2119                         i += 1;
2120                         continue;
2121                 }
2122         }
2123
2124         const struct sched_param param = { .sched_priority = 99 };
2125         sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
2126
2127
2128 //      test_map(peer);
2129
2130         return 0;
2131 }
2132
2133 void print_obj(struct map_node *mn)
2134 {
2135         fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2136                         (unsigned long long) mn->objectidx, mn->object, 
2137                         (unsigned int) mn->objectlen, 
2138                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2139 }
2140
2141 void print_map(struct map *m)
2142 {
2143         uint64_t nr_objs = m->size/block_size;
2144         if (m->size % block_size)
2145                 nr_objs++;
2146         fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n", 
2147                         m->volume, m->volumelen, 
2148                         (unsigned long long) m->size, 
2149                         (unsigned long long) nr_objs,
2150                         m->version);
2151         uint64_t i;
2152         struct map_node *mn;
2153         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2154                 return;
2155         for (i = 0; i < nr_objs; i++) {
2156                 mn = find_object(m, i);
2157                 if (!mn){
2158                         printf("object idx [%llu] not found!\n", (unsigned long long) i);
2159                         continue;
2160                 }
2161                 print_obj(mn);
2162         }
2163 }
2164
2165 /*
2166 void test_map(struct peerd *peer)
2167 {
2168         int i,j, ret;
2169         //struct sha256_ctx sha256ctx;
2170         unsigned char buf[SHA256_DIGEST_SIZE];
2171         char buf_new[XSEG_MAX_TARGETLEN + 20];
2172         struct map *m = malloc(sizeof(struct map));
2173         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2174         m->volume[XSEG_MAX_TARGETLEN] = 0;
2175         strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2176         buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2177         m->volumelen = XSEG_MAX_TARGETLEN;
2178         m->size = 100*block_size;
2179         m->objects = xhash_new(3, INTEGER);
2180         struct map_node *map_node = calloc(100, sizeof(struct map_node));
2181         for (i = 0; i < 100; i++) {
2182                 sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2183                 gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2184                 
2185                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2186                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2187                 }
2188                 map_node[i].objectidx = i;
2189                 map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2190                 map_node[i].flags = MF_OBJECT_EXIST;
2191                 ret = insert_object(m, &map_node[i]);
2192         }
2193
2194         char *data = malloc(block_size);
2195         mapheader_to_map(m, data);
2196         uint64_t pos = mapheader_size;
2197
2198         for (i = 0; i < 100; i++) {
2199                 map_node = find_object(m, i);
2200                 if (!map_node){
2201                         printf("no object node %d \n", i);
2202                         exit(1);
2203                 }
2204                 object_to_map(data+pos, map_node);
2205                 pos += objectsize_in_map;
2206         }
2207 //      print_map(m);
2208
2209         struct map *m2 = malloc(sizeof(struct map));
2210         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2211         m->volume[XSEG_MAX_TARGETLEN] = 0;
2212         m->volumelen = XSEG_MAX_TARGETLEN;
2213
2214         m2->objects = xhash_new(3, INTEGER);
2215         ret = read_map(peer, m2, data);
2216 //      print_map(m2);
2217
2218         int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2219         ssize_t r, sum = 0;
2220         while (sum < block_size) {
2221                 r = write(fd, data + sum, block_size -sum);
2222                 if (r < 0){
2223                         perror("write");
2224                         printf("write error\n");
2225                         exit(1);
2226                 } 
2227                 sum += r;
2228         }
2229         close(fd);
2230         map_node = find_object(m, 0);
2231         free(map_node);
2232         free(m);
2233 }
2234 */