fix various bugs in mt-mapperd.
[archipelago] / xseg / peers / user / mt-mapperd.c
1 #include <stdio.h>
2 #include <unistd.h>
3 #include <sys/types.h>
4 #include <pthread.h>
5 #include <xseg/xseg.h>
6 #include <mpeer.h>
7 #include <time.h>
8 #include <sys/sha256.h>
9 #include <xtypes/xlock.h>
10 #include <xtypes/xhash.h>
11 #include <xseg/protocol.h>
12 #include <sys/stat.h>
13 #include <fcntl.h>
14
15 #define MF_PENDING 1
16
17 /* hex representation of sha256 value takes up double the sha256 size */
18 #define XSEG_MAX_TARGET_LEN (SHA256_DIGEST_SIZE << 1)
19
20 #define block_size (1<<20)
21 #define objectsize_in_map (1 + XSEG_MAX_TARGET_LEN) /* transparency byte + max object len */
22 #define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value  + volume size */
23
24 #define MF_OBJECT_EXIST         (1 << 0)
25 #define MF_OBJECT_COPYING       (1 << 1)
26
27 char *magic_string = "This a magic string. Please hash me";
28 unsigned char magic_sha256[SHA256_DIGEST_SIZE]; /* sha256 hash value of magic string */
29 char zero_block[SHA256_DIGEST_SIZE * 2 + 1];    /* hexlified sha256 hash value of a block full of zeros */
30
31 struct map_node {
32         uint32_t flags;
33         uint32_t objectidx;
34         uint32_t objectlen;
35         char object[XSEG_MAX_TARGET_LEN + 1];   /* NULL terminated string */
36         struct xq pending;                      /* pending peer_reqs on this object */
37 };
38
39 #define MF_MAP_LOADING          (1 << 0)
40 #define MF_MAP_DESTROYED        (1 << 1)
41
42 struct map {
43         uint32_t flags;
44         uint64_t size;
45         uint32_t volumelen;
46         char volume[XSEG_MAX_TARGET_LEN + 1]; /* NULL terminated string */
47         xhash_t *objects;       /* obj_index --> map_node */
48         struct xq pending;      /* pending peer_reqs on this map */
49 };
50
51 struct mapperd {
52         xport bportno;
53         xhash_t *hashmaps; // hash_function(target) --> struct map
54 };
55
56 struct mapper_io {
57         volatile uint32_t copyups;      /* nr of copyups pending, issued by this mapper io */
58         xhash_t *copyups_nodes;         /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
59         int err;                        /* error flag */
60 };
61
62 /*
63  * Helper functions
64  */
65
66 static inline struct mapperd * __get_mapperd(struct peerd *peer)
67 {
68         return (struct mapperd *) peer->priv;
69 }
70
71 static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
72 {
73         return (struct mapper_io *) pr->priv;
74 }
75
76 static inline uint64_t calc_map_obj(struct map *map)
77 {
78         uint64_t nr_objs = map->size / block_size;
79         if (map->size % block_size)
80                 nr_objs++;
81         return nr_objs;
82 }
83
84 static uint32_t calc_nr_obj(struct xseg_request *req)
85 {
86         unsigned int r = 1;
87         uint64_t rem_size = req->size;
88         uint64_t obj_offset = req->offset & (block_size -1); //modulo
89         uint64_t obj_size =  (rem_size > block_size) ? block_size - obj_offset : rem_size;
90         rem_size -= obj_size;
91         while (rem_size > 0) {
92                 obj_size = (rem_size - block_size > 0) ? block_size : rem_size;
93                 rem_size -= obj_size;
94                 r++;
95         }
96
97         return r;
98 }
99
100 /*
101  * Maps handling functions
102  */
103
104 static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
105 {
106         int r;
107         struct map *m = NULL;
108         char buf[XSEG_MAX_TARGET_LEN+1];
109         //assert targetlen <= XSEG_MAX_TARGET_LEN
110         strncpy(buf, target, targetlen);
111         buf[targetlen] = 0;
112         r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
113         if (r < 0)
114                 return NULL;
115         return m;
116 }
117
118
119 static int insert_map(struct mapperd *mapper, struct map *map)
120 {
121         int r = -1;
122         
123         if (find_map(mapper, map->volume, map->volumelen)){
124                 //printf("map found in insert map\n");
125                 goto out;
126         }
127         
128         r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
129         if (r == -XHASH_ERESIZE) {
130                 xhashidx shift = xhash_grow_size_shift(map->objects);
131                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
132                 if (!new_hashmap)
133                         goto out;
134                 mapper->hashmaps = new_hashmap;
135                 r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
136         }
137 out:
138         return r;
139 }
140
141 static int remove_map(struct mapperd *mapper, struct map *map)
142 {
143         int r = -1;
144         
145         //assert no pending pr on map
146         
147         r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
148         if (r == -XHASH_ERESIZE) {
149                 xhashidx shift = xhash_shrink_size_shift(map->objects);
150                 xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
151                 if (!new_hashmap)
152                         goto out;
153                 mapper->hashmaps = new_hashmap;
154                 r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
155         }
156 out:
157         return r;
158 }
159
160 /* async map load */
161 static int load_map(struct peerd *peer, struct peer_req *pr, char *target, uint32_t targetlen)
162 {
163         int r;
164         xport p;
165         struct xseg_request *req;
166         struct mapperd *mapper = __get_mapperd(peer);
167         void *dummy;
168         //printf("Loading map\n");
169
170         struct map *m = find_map(mapper, target, targetlen);
171         if (!m) {
172                 m = malloc(sizeof(struct map));
173                 if (!m)
174                         goto out_err;
175                 m->size = -1;
176                 strncpy(m->volume, target, targetlen);
177                 m->volume[XSEG_MAX_TARGET_LEN] = 0;
178                 m->volumelen = targetlen;
179                 m->flags = MF_MAP_LOADING;
180                 xqindex *qidx = xq_alloc_empty(&m->pending, peer->nr_ops);
181                 if (!qidx) {
182                         goto out_map;
183                 }
184                 m->objects = xhash_new(3, INTEGER); //FIXME err_check;
185                 if (!m->objects)
186                         goto out_q;
187                 __xq_append_tail(&m->pending, (xqindex) pr);
188         } else {
189                 goto map_exists;
190         }
191
192         r = insert_map(mapper, m);
193         if (r < 0)  
194                 goto out_hash;
195         
196         //printf("Loading map: preparing req\n");
197
198         req = xseg_get_request(peer->xseg, peer->portno, mapper->bportno, X_ALLOC);
199         if (!req)
200                 goto out_fail;
201
202         r = xseg_prep_request(peer->xseg, req, targetlen, block_size);
203         if (r < 0)
204                 goto out_put;
205
206         char *reqtarget = xseg_get_target(peer->xseg, req);
207         if (!reqtarget)
208                 goto out_put;
209         strncpy(reqtarget, target, targetlen);
210         req->op = X_READ;
211         req->size = block_size;
212         req->offset = 0;
213         r = xseg_set_req_data(peer->xseg, req, pr);
214         if (r < 0)
215                 goto out_put;
216         p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
217         if (p == NoPort) 
218                 goto out_unset;
219         r = xseg_signal(peer->xseg, p);
220         
221         //printf("Loading map: request issued\n");
222         return 0;
223
224 out_unset:
225         xseg_get_req_data(peer->xseg, req, &dummy);
226 out_put:
227         xseg_put_request(peer->xseg, req, peer->portno);
228
229 out_fail:
230         remove_map(mapper, m);
231         xqindex idx;
232         while((idx = __xq_pop_head(&m->pending)) != Noneidx) {
233                 fail(peer, (struct peer_req *) idx);
234         }
235
236 out_hash:
237         xhash_free(m->objects);
238 out_q:
239         xq_free(&m->pending);
240 out_map:
241         free(m);
242 out_err:
243         return -1;
244
245 map_exists:
246         //assert map loading when this is reached
247         if (m->flags & MF_MAP_LOADING) {
248                 __xq_append_tail(&m->pending, (xqindex) pr);
249         }
250         else {
251                 dispatch(peer, pr, pr->req);
252         }
253         return 0;
254 }
255
256
257 static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
258                                 char *target, uint32_t targetlen, struct map **m)
259 {
260         struct mapperd *mapper = __get_mapperd(peer);
261         int r;
262         //printf("find map or load\n");
263         *m = find_map(mapper, target, targetlen);
264         if (*m) {
265                 //printf("map found\n");
266                 if ((*m)->flags & MF_MAP_LOADING) {
267                         __xq_append_tail(&(*m)->pending, (xqindex) pr);
268                         //printf("Map loading\n");
269                         return MF_PENDING;
270                 } else {
271                         //printf("Map returned\n");
272                         return 0;
273                 }
274         }
275         r = load_map(peer, pr, target, targetlen);
276         if (r < 0)
277                 return -1; //error
278         return MF_PENDING;      
279 }
280
281 /*
282  * Object handling functions
283  */
284
285 struct map_node *find_object(struct map *map, uint64_t obj_index)
286 {
287         struct map_node *mn;
288         int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
289         if (r < 0)
290                 return NULL;
291         return mn;
292 }
293
294 static int insert_object(struct map *map, struct map_node *mn)
295 {
296         //FIXME no find object first
297         int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
298         if (r == -XHASH_ERESIZE) {
299                 unsigned long shift = xhash_grow_size_shift(map->objects);
300                 map->objects = xhash_resize(map->objects, shift, NULL);
301                 if (!map->objects)
302                         return -1;
303                 r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
304         }
305         return r;
306 }
307
308
309 /*
310  * map read/write functions 
311  */
312 static inline void pithosmap_to_object(struct map_node *mn, char *buf)
313 {
314         int i;
315         //hexlify sha256 value
316         for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
317                 sprintf(mn->object, "%02x", buf[i]);
318         }
319
320         mn->object[XSEG_MAX_TARGET_LEN] = 0;
321         mn->objectlen = strlen(mn->object);
322         mn->flags = 0;
323 }
324
325 static inline void map_to_object(struct map_node *mn, char *buf)
326 {
327         char c = buf[0];
328         mn->flags = 0;
329         if (c)
330                 mn->flags |= MF_OBJECT_EXIST;
331         memcpy(mn->object, buf+1, XSEG_MAX_TARGET_LEN);
332         mn->object[XSEG_MAX_TARGET_LEN] = 0;
333         mn->objectlen = strlen(mn->object);
334 }
335
336 static inline void object_to_map(char* buf, struct map_node *mn)
337 {
338         buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
339         memcpy(buf+1, mn->object, mn->objectlen);
340         memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGET_LEN - mn->objectlen); //zero out the rest of the buffer
341 }
342
343 static inline void mapheader_to_map(struct map *m, char *buf)
344 {
345         uint64_t pos = 0;
346         memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
347         pos += SHA256_DIGEST_SIZE;
348         memcpy(buf + pos, &m->size, sizeof(m->size));
349         pos += sizeof(m->size);
350 }
351
352
353 static int object_write(struct peerd *peer, struct peer_req *pr, struct map_node *mn)
354 {
355         void *dummy;
356         struct mapperd *mapper = __get_mapperd(peer);
357         struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno,
358                                                         mapper->bportno, X_ALLOC);
359         if (!req)
360                 goto out_err;
361         int r = xseg_prep_request(peer->xseg, req, mn->objectlen, objectsize_in_map);
362         if (r < 0)
363                 goto out_put;
364         char *target = xseg_get_target(peer->xseg, req);
365         strncpy(target, mn->object, mn->objectlen);
366         req->size = objectsize_in_map;
367         req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
368         req->op = X_WRITE;
369         char *data = xseg_get_data(peer->xseg, req);
370         object_to_map(data, mn);
371
372         r = xseg_set_req_data(peer->xseg, req, pr);
373         if (r < 0)
374                 goto out_put;
375         xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
376         if (p == NoPort)
377                 goto out_unset;
378         r = xseg_signal(peer->xseg, p);
379
380         return MF_PENDING;
381
382 out_unset:
383         xseg_get_req_data(peer->xseg, req, &dummy);
384 out_put:
385         xseg_put_request(peer->xseg, req, peer->portno);
386 out_err:
387         return -1;
388 }
389
390 static int map_write(struct peerd *peer, struct peer_req* pr, struct map *map)
391 {
392         void *dummy;
393         struct mapperd *mapper = __get_mapperd(peer);
394         struct map_node *mn;
395         uint64_t i, pos, max_objidx = calc_map_obj(map);
396         struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
397                                                         mapper->bportno, X_ALLOC);
398         if (!req)
399                 goto out_err;
400         int r = xseg_prep_request(peer->xseg, req, map->volumelen, 
401                                         mapheader_size + max_objidx * objectsize_in_map);
402         if (r < 0)
403                 goto out_put;
404         char *data = xseg_get_data(peer->xseg, req);
405         mapheader_to_map(map, data);
406         pos = mapheader_size;
407
408         if (map->size % block_size)
409                 max_objidx++;
410         for (i = 0; i < max_objidx; i++) {
411                 mn = find_object(map, i);
412                 if (!mn)
413                         goto out_put;
414                 object_to_map(data+pos, mn);
415                 pos += objectsize_in_map;
416         }
417         r = xseg_set_req_data(peer->xseg, req, pr);
418         if (r < 0)
419                 goto out_put;
420         xport p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
421         if (p == NoPort)
422                 goto out_unset;
423         r = xseg_signal(peer->xseg, p);
424         return MF_PENDING;
425
426 out_unset:
427         xseg_get_req_data(peer->xseg, req, &dummy);
428 out_put:
429         xseg_put_request(peer->xseg, req, peer->portno);
430 out_err:
431         return -1;
432 }
433
434 static int read_map (struct peerd *peer, struct map *map, char *buf)
435 {
436         char nulls[SHA256_DIGEST_SIZE];
437         memset(nulls, 0, SHA256_DIGEST_SIZE);
438
439         int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
440         if (r) {
441                 //read error;
442                 return -1;
443         }
444         //type 1, our type, type 0 pithos map
445         int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
446         uint64_t pos;
447         uint64_t i, nr_objs;
448         struct map_node *map_node;
449         if (type) {
450                 pos = SHA256_DIGEST_SIZE;
451                 map->size = *(uint64_t *) (buf + pos);
452                 pos += sizeof(uint64_t);
453                 nr_objs = map->size / block_size;
454                 if (map->size % block_size)
455                         nr_objs++;
456                 map_node = calloc(nr_objs, sizeof(struct map_node));
457                 if (!map_node)
458                         return -1;
459
460                 for (i = 0; i < nr_objs; i++) {
461                         map_node[i].objectidx = i;
462                         xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
463                         map_to_object(&map_node[i], buf + pos);
464                         pos += objectsize_in_map;
465                         r = insert_object(map, &map_node[i]); //FIXME error check
466                 }
467         } else {
468                 pos = 0;
469                 uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
470                 map_node = calloc(max_nr_objs, sizeof(struct map_node));
471                 if (!map_node)
472                         return -1;
473                 for (i = 0; i < max_nr_objs; i++) {
474                         if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
475                                 break;
476                         map_node[i].objectidx = i;
477                         xqindex *qidx = xq_alloc_empty(&map_node[i].pending, peer->nr_ops); //FIXME error check
478                         pithosmap_to_object(&map_node[i], buf + pos);
479                         pos += SHA256_DIGEST_SIZE; 
480                         r = insert_object(map, &map_node[i]); //FIXME error check
481                 }
482                 map->size = i * block_size; 
483         }
484         return 0;
485
486         //FIXME cleanup on error
487 }
488
489 /*
490  * copy up functions
491  */
492
493 static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
494 {
495         int r = 0;
496         if (mn){
497                 r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
498                 if (r == -XHASH_ERESIZE) {
499                         xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
500                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
501                         if (!new_hashmap)
502                                 goto out;
503                         mio->copyups_nodes = new_hashmap;
504                         r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
505                 }
506         }
507         else {
508                 r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
509                 if (r == -XHASH_ERESIZE) {
510                         xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
511                         xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
512                         if (!new_hashmap)
513                                 goto out;
514                         mio->copyups_nodes = new_hashmap;
515                         r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
516                 }
517         }
518 out:
519         return r;
520 }
521
522 static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
523 {
524         struct map_node *mn;
525         int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
526         if (r < 0)
527                 return NULL;
528         return mn;
529 }
530
531 static int copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
532 {
533         struct mapperd *mapper = __get_mapperd(peer);
534         struct mapper_io *mio = __get_mapper_io(pr);
535         void *dummy;
536         int r = -1, i;
537         xport p;
538         struct sha256_ctx sha256ctx;
539         uint32_t newtargetlen;
540         char new_target[XSEG_MAX_TARGET_LEN + 1]; 
541         unsigned char buf[SHA256_DIGEST_SIZE];  //assert sha256_digest_size(32) <= MAXTARGETLEN 
542         char new_object[XSEG_MAX_TARGET_LEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
543         strncpy(new_object, mn->object, mn->objectlen);
544         sprintf(new_object + mn->objectlen, "%u", mn->objectidx); //sprintf adds null termination
545         new_object[XSEG_MAX_TARGET_LEN + 19] = 0;
546
547
548         /* calculate new object name */
549         sha256_init_ctx(&sha256ctx);
550         sha256_process_bytes(new_object, strlen(new_object), &sha256ctx);
551         sha256_finish_ctx(&sha256ctx, buf);
552         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
553                 sprintf (new_target + 2*i, "%02x", buf[i]);
554         newtargetlen = SHA256_DIGEST_SIZE  * 2;
555
556
557         struct xseg_request *req = xseg_get_request(peer->xseg, peer->portno, 
558                                                         mapper->bportno, X_ALLOC);
559         if (!req)
560                 goto out;
561         r = xseg_prep_request(peer->xseg, req, newtargetlen, 
562                                 sizeof(struct xseg_request_copy));
563         if (r < 0)
564                 goto out_put;
565
566         char *target = xseg_get_target(peer->xseg, req);
567         strncpy(target, new_target, newtargetlen);
568
569         struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
570         strncpy(xcopy->target, mn->object, mn->objectlen);
571         xcopy->target[mn->objectlen] = 0;
572
573         req->offset = 0;
574         req->size = block_size;
575         req->op = X_COPY;
576         r = xseg_set_req_data(peer->xseg, req, pr);
577         if (r<0)
578                 goto out_put;
579         r = __set_copyup_node(mio, req, mn);
580         p = xseg_submit(peer->xseg, req, peer->portno, X_ALLOC);
581         if (p == NoPort) {
582                 r = -1;
583                 goto out_unset;
584         }
585         xseg_signal(peer->xseg, p);
586
587         r = 0;
588 out:
589         return r;
590
591 out_unset:
592         xseg_get_req_data(peer->xseg, req, &dummy);
593 out_put:
594         xseg_put_request(peer->xseg, req, peer->portno);
595         goto out;
596
597 }
598
599 /*
600  * request handling functions
601  */
602
603 static int handle_mapread(struct peerd *peer, struct peer_req *pr, 
604                                 struct xseg_request *req)
605 {
606         int r;
607         xqindex idx;
608         struct mapperd *mapper = __get_mapperd(peer);
609         //assert req->op = X_READ;
610         char *target = xseg_get_target(peer->xseg, req);
611         struct map *map = find_map(mapper, target, req->targetlen);
612         if (!map)
613                 goto out_err;
614         //assert map->flags & MF_MAP_LOADING
615
616         if (req->state & XS_FAILED)
617                 goto out_fail;
618
619         char *data = xseg_get_data(peer->xseg, req);
620         r = read_map(peer, map, data);
621         if (r < 0)
622                 goto out_fail;
623         
624         xseg_put_request(peer->xseg, req, peer->portno);
625         map->flags &= ~MF_MAP_LOADING;
626         while((idx = __xq_pop_head(&map->pending)) != Noneidx){
627                 struct peer_req *preq = (struct peer_req *) idx;
628                 dispatch(peer, preq, preq->req);
629         }
630         return 0;
631
632 out_fail:
633         xseg_put_request(peer->xseg, req, peer->portno);
634         map->flags &= ~MF_MAP_LOADING;
635         while((idx = __xq_pop_head(&map->pending)) != Noneidx){
636                 struct peer_req *preq = (struct peer_req *) idx;
637                 fail(peer, preq);
638         }
639         remove_map(mapper, map);
640         free(map);
641         return 0;
642
643 out_err:
644         xseg_put_request(peer->xseg, req, peer->portno);
645         return -1;
646 }
647
648 static int handle_clone(struct peerd *peer, struct peer_req *pr, 
649                                 struct xseg_request *req)
650 {
651         struct mapperd *mapper = __get_mapperd(peer);
652         struct mapper_io *mio = __get_mapper_io(pr);
653         (void) mio;
654         struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
655         if (!xclone) {
656                 goto out_err;
657         }
658         struct map *map;
659         int r = find_or_load_map(peer, pr, xclone->target, strlen(xclone->target), &map);
660         if (r < 0)
661                 goto out_err;
662         else if (r == MF_PENDING)
663                 return 0;
664         
665         if (map->flags & MF_MAP_DESTROYED) {
666                 fail(peer, pr);
667                 return 0;
668         }
669
670         //alloc and init struct map
671         struct map *clonemap = malloc(sizeof(struct map));
672         if (!clonemap) {
673                 goto out_err;
674         }
675         clonemap->objects = xhash_new(3, INTEGER);
676         if (!clonemap->objects){
677                 goto out_err_clonemap;
678         }
679         xqindex *qidx = xq_alloc_empty(&clonemap->pending, peer->nr_ops);
680         if (!qidx)
681                 goto out_err_objhash;
682         clonemap->size = xclone->size;
683         clonemap->flags = 0;
684         char *target = xseg_get_target(peer->xseg, pr->req);
685         strncpy(clonemap->volume, target, pr->req->targetlen);
686         clonemap->volumelen = pr->req->targetlen;
687         clonemap->volume[clonemap->volumelen] = 0; //NULL TERMINATE
688
689         //alloc and init map_nodes
690         unsigned long c = xclone->size/block_size + 1;
691         struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
692         if (!map_nodes){
693                 goto out_err_q;
694         }
695         int i;
696         for (i = 0; i < xclone->size/block_size + 1; i++) {
697                 struct map_node *mn = find_object(map, i);
698                 if (mn) {
699                         strncpy(map_nodes[i].object, mn->object, mn->objectlen);
700                         map_nodes[i].objectlen = mn->objectlen;
701                 } else {
702                         strncpy(map_nodes[i].object, zero_block, strlen(zero_block));
703                         map_nodes[i].objectlen = strlen(zero_block);
704                 }
705                 map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
706                 map_nodes[i].flags = 0;
707                 map_nodes[i].objectidx = i;
708                 xq_alloc_empty(&map_nodes[i].pending, peer->nr_ops);
709                 r = insert_object(clonemap, &map_nodes[i]);
710                 if (r < 0)
711                         goto out_free_all;
712         }
713         //insert map
714         r = insert_map(mapper, clonemap);
715         if ( r < 0) {
716                 goto out_free_all;
717         }
718
719         complete(peer, pr);
720         return 0;
721
722 out_free_all:
723         //FIXME not freeing allocated queues of map_nodes
724         free(map_nodes);
725 out_err_q:
726         xq_free(&clonemap->pending);
727 out_err_objhash:
728         xhash_free(clonemap->objects);
729 out_err_clonemap:
730         free(clonemap);
731 out_err:
732         fail(peer, pr);
733         return -1;
734 }
735
736 static int req2objs(struct peerd *peer, struct peer_req *pr, 
737                                         struct map *map, int write)
738 {
739         char *target = xseg_get_target(peer->xseg, pr->req);
740         uint32_t nr_objs = calc_nr_obj(pr->req);
741         uint64_t size = sizeof(struct xseg_reply_map) + 
742                         nr_objs * sizeof(struct xseg_reply_map_scatterlist);
743
744         /* resize request to fit reply */
745         char buf[XSEG_MAX_TARGET_LEN];
746         strncpy(buf, target, pr->req->targetlen);
747         int r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
748         if (r < 0) {
749                 printf("couldn't resize req\n");
750                 return -1;
751         }
752         target = xseg_get_target(peer->xseg, pr->req);
753         strncpy(target, buf, pr->req->targetlen);
754
755         /* structure reply */
756         struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
757         reply->cnt = nr_objs;
758
759         uint32_t idx = 0;
760         uint64_t rem_size = pr->req->size;
761         uint64_t obj_index = pr->req->offset / block_size;
762         uint64_t obj_offset = pr->req->offset & (block_size -1); //modulo
763         uint64_t obj_size =  (rem_size > block_size) ? block_size - obj_offset : rem_size;
764         struct map_node * mn = find_object(map, obj_index);
765         if (!mn) {
766                 printf("coudn't find obj_index\n");
767                 goto out_err;
768         }
769         if (write && mn->flags & MF_OBJECT_COPYING) 
770                 goto out_object_copying;
771         if (write && !(mn->flags & MF_OBJECT_EXIST)) {
772                 //calc new_target, copy up object
773                 r = copyup_object(peer, mn, pr);
774                 if (r < 0) {
775                         printf("err_copy\n");
776                         goto out_err_copy;
777                 }
778                 mn->flags |= MF_OBJECT_COPYING;
779                 goto out_object_copying;
780         }
781
782         strncpy(reply->segs[idx].target, mn->object, XSEG_MAX_TARGET_LEN); // or strlen(mn->target ?);
783         reply->segs[idx].target[mn->objectlen] = 0;
784         reply->segs[idx].offset = obj_offset;
785         reply->segs[idx].size = obj_size;
786         rem_size -= obj_size;
787         while (rem_size > 0) {
788                 idx++;
789                 obj_index++;
790                 obj_offset = 0;
791                 obj_size = (rem_size - block_size > 0) ? block_size : rem_size;
792                 rem_size -= obj_size;
793                 mn = find_object(map, obj_index);
794                 if (!mn) {
795                         printf("coudn't find obj_index\n");
796                         goto out_err;
797                 }
798                 if (write && mn->flags & MF_OBJECT_COPYING) 
799                         goto out_object_copying;
800                 if (write && !(mn->flags & MF_OBJECT_EXIST)) {
801                         //calc new_target, copy up object
802                         r = copyup_object(peer, mn, pr);
803                         if (r < 0) {
804                                 printf("err_copy\n");
805                                 goto out_err_copy;
806                         }
807                         mn->flags |= MF_OBJECT_COPYING;
808                         goto out_object_copying;
809                 }
810                 strncpy(reply->segs[idx].target, mn->object, XSEG_MAX_TARGET_LEN); // or strlen(mn->target ?);
811                 reply->segs[idx].target[mn->objectlen] = 0;
812                 reply->segs[idx].offset = obj_offset;
813                 reply->segs[idx].size = obj_size;
814         }
815
816         return 0;
817
818 out_object_copying:
819         //printf("r2o mn: %lx\n", mn);
820         if(__xq_append_tail(&mn->pending, (xqindex) pr) == Noneidx)
821                 printf("couldn't append pr to tail\n");
822         return MF_PENDING;
823
824 out_err_copy:
825 out_err:
826         return -1;
827 }
828
829 static int handle_mapr(struct peerd *peer, struct peer_req *pr, 
830                                 struct xseg_request *req)
831 {
832         struct mapperd *mapper = __get_mapperd(peer);
833         struct mapper_io *mio = __get_mapper_io(pr);
834         (void)mio;
835         //get_map
836         char *target = xseg_get_target(peer->xseg, pr->req);
837         struct map *map;
838         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
839         if (r < 0) {
840                 fail(peer, pr);
841                 return -1;
842         }
843         else if (r == MF_PENDING)
844                 return 0;
845         
846         if (map->flags & MF_MAP_DESTROYED) {
847                 fail(peer, pr);
848                 return 0;
849         }
850         
851         //get_object
852         r = req2objs(peer, pr, map, 0);
853         if  (r < 0){
854                 fail(peer, pr);
855         }
856         else if (r == 0)
857                 complete(peer, pr);
858
859         return 0;
860
861
862 }
863
864 static int handle_copyup(struct peerd *peer, struct peer_req *pr,
865                                 struct xseg_request *req)
866 {
867         struct mapperd *mapper = __get_mapperd(peer);
868         (void) mapper;
869         struct mapper_io *mio = __get_mapper_io(pr);
870         int r = 0;
871         //printf("handle copyup reply\n");
872         if (req->state & XS_FAILED && !(req->state & XS_SERVED)) {
873                 //printf("copy up failed\n");
874                 mio->err = 1;
875                 r = 1;
876         }
877         struct map_node *mn = __get_copyup_node(mio, req);
878         if (!mn){
879                 //printf("copy up mn not found\n");
880                 mio->err =1; //BUG
881         }
882         else {
883                 //printf("mn: %lx\n", mn);
884                 mn->flags &= ~MF_OBJECT_COPYING;
885                 if (!r) {
886                         mn->flags |= MF_OBJECT_EXIST;
887                         char *target = xseg_get_target(peer->xseg, req);
888                         strncpy(mn->object, target, req->targetlen);
889                 }
890         }
891         __set_copyup_node(mio, req, NULL);
892         xseg_put_request(peer->xseg, req, peer->portno);
893
894         mio->copyups--;
895         if (mn) {
896                 //handle peer_requests waiting on copy up
897                 xqindex idx;
898                 //printf("foo\n");
899                 while ((idx = __xq_pop_head(&mn->pending)) != Noneidx){
900                         //printf("dispatching pending\n");
901                         struct peer_req * preq = (struct peer_req *) idx;
902                         dispatch(peer, preq, preq->req);
903                 }
904         }
905
906         return 0;
907 }
908
909 static int handle_mapw(struct peerd *peer, struct peer_req *pr, 
910                                 struct xseg_request *req)
911 {
912         struct mapperd *mapper = __get_mapperd(peer);
913         struct mapper_io *mio = __get_mapper_io(pr);
914         (void) mio;
915         /* handle copy up replies separately */
916         if (req->op == X_COPY)
917                 return handle_copyup(peer, pr, req);
918
919         char *target = xseg_get_target(peer->xseg, pr->req);
920         struct map *map;
921         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
922         if (r < 0) {
923                 fail(peer, pr);
924                 return -1;
925         }
926         else if (r == MF_PENDING)
927                 return 0;
928         
929         if (map->flags & MF_MAP_DESTROYED) {
930                 printf("map MF_MAP_DESTROYED req %lx\n", pr->req);
931                 fail(peer, pr);
932                 return 0;
933         }
934         if (mio->err) {
935                 //printf("mapw failed\n");
936                 fail(peer, pr);
937                 return 0;
938         }
939         //printf("handle mapw\n");
940
941         mio->err = 0;
942         r = req2objs(peer, pr, map, 1);
943         if (r < 0){
944                 printf("req2obj returned r < 0 for req %lx\n", pr->req);
945                 fail(peer, pr);
946         }
947         if (r == 0)
948                 complete(peer, pr);
949         //else copyup pending, wait for pr restart
950
951         return 0;
952 }
953
954 static int handle_snap(struct peerd *peer, struct peer_req *pr, 
955                                 struct xseg_request *req)
956 {
957         fail(peer, pr);
958         return 0;
959 }
960
961 static int handle_info(struct peerd *peer, struct peer_req *pr, 
962                                 struct xseg_request *req)
963 {
964         struct mapperd *mapper = __get_mapperd(peer);
965         struct mapper_io *mio = __get_mapper_io(pr);
966         (void) mio;
967         char *target = xseg_get_target(peer->xseg, pr->req);
968         if (!target) {
969                 fail(peer, pr);
970                 return 0;
971         }
972         //printf("Handle info\n");
973         struct map *map;
974         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
975         if (r < 0) {
976                 fail(peer, pr);
977                 return -1;
978         }
979         else if (r == MF_PENDING)
980                 return 0;
981         if (map->flags & MF_MAP_DESTROYED) {
982                 fail(peer, pr);
983                 return 0;
984         }
985         
986         struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
987         xinfo->size = map->size;
988         complete(peer, pr);
989
990         return 0;
991 }
992
993 static int handle_destroy(struct peerd *peer, struct peer_req *pr, 
994                                 struct xseg_request *req)
995 {
996         /*
997         struct map *map;
998         int r = find_or_load_map(peer, pr, target, pr->req->targetlen, &map);
999         if (r < 0) {
1000                 fail(peer, pr);
1001                 return -1;
1002         }
1003         else if (r == MF_PENDING)
1004                 return 0;
1005         map->flags |= MF_MAP_DESTROYED;
1006         */
1007         //delete map block
1008         //do not delete all objects
1009         //remove_map(mapper, map);
1010         //free(map, map_nodes, all allocated resources);
1011         //complete(peer, pr);
1012         fail(peer, pr);
1013         return 0;
1014 }
1015
1016 int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req)
1017 {
1018         struct mapperd *mapper = __get_mapperd(peer);
1019         (void) mapper;
1020         struct mapper_io *mio = __get_mapper_io(pr);
1021         (void) mio;
1022
1023         if (req->op == X_READ) {
1024                 /* catch map reads requests here */
1025                 handle_mapread(peer, pr, req);
1026                 return 0;
1027         }
1028
1029         switch (pr->req->op) {
1030                 /* primary xseg operations of mapper */
1031                 case X_CLONE: handle_clone(peer, pr, req); break;
1032                 case X_MAPR: handle_mapr(peer, pr, req); break;
1033                 case X_MAPW: handle_mapw(peer, pr, req); break;
1034 //              case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1035                 case X_INFO: handle_info(peer, pr, req); break;
1036                 case X_DELETE: handle_destroy(peer, pr, req); break;
1037                 default: break;
1038         }
1039         return 0;
1040 }
1041
1042 int custom_peer_init(struct peerd *peer, int argc, const char *argv[])
1043 {
1044         int i;
1045         unsigned char buf[SHA256_DIGEST_SIZE];
1046         char *zero;
1047         struct sha256_ctx sha256ctx;
1048         /* calculate out magic sha hash value */
1049         sha256_init_ctx(&sha256ctx);
1050         sha256_process_bytes(magic_string, strlen(magic_string), &sha256ctx);
1051         sha256_finish_ctx(&sha256ctx, magic_sha256);
1052
1053         /* calculate zero block */
1054         //FIXME check hash value
1055         zero = malloc(block_size);
1056         memset(zero, 0, block_size);
1057         sha256_init_ctx(&sha256ctx);
1058         sha256_process_bytes(zero, block_size, &sha256ctx);
1059         sha256_finish_ctx(&sha256ctx, buf);
1060         for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1061                 sprintf(zero_block + 2*i, "%02x", buf[i]);
1062         printf("%s \n", zero_block);
1063
1064         //FIXME error checks
1065         struct mapperd *mapper = malloc(sizeof(struct mapperd));
1066         mapper->hashmaps = xhash_new(3, STRING);
1067         peer->priv = mapper;
1068         
1069         for (i = 0; i < peer->nr_ops; i++) {
1070                 struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1071                 mio->copyups_nodes = xhash_new(3, INTEGER);
1072                 mio->copyups = 0;
1073                 mio->err = 0;
1074                 peer->peer_reqs[i].priv = mio;
1075         }
1076
1077         for (i = 0; i < argc; i++) {
1078                 if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1079                         mapper->bportno = atoi(argv[i+1]);
1080                         i += 1;
1081                         continue;
1082                 }
1083                 /* enforce only one thread */
1084                 if (!strcmp(argv[i], "-t") && (i+1) < argc){
1085                         int t = atoi(argv[i+1]);
1086                         if (t != 1) {
1087                                 printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1088                                 return -1;
1089                         }
1090                         i += 1;
1091                         continue;
1092                 }
1093         }
1094
1095         //test_map(peer);
1096
1097         return 0;
1098 }
1099
1100 void print_obj(struct map_node *mn)
1101 {
1102         printf("[%llu]object name: %s[%u] exists: %c\n", mn->objectidx, mn->object, mn->objectlen, 
1103                         (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
1104 }
1105
1106 void print_map(struct map *m)
1107 {
1108         uint64_t nr_objs = m->size/block_size;
1109         if (m->size % block_size)
1110                 nr_objs++;
1111         printf("Volume name: %s[%u], size: %llu, nr_objs: %llu\n", 
1112                         m->volume, m->volumelen, m->size, nr_objs);
1113         uint64_t i;
1114         struct map_node *mn;
1115         if (nr_objs > 1000000) //FIXME to protect against invalid volume size
1116                 return;
1117         for (i = 0; i < nr_objs; i++) {
1118                 mn = find_object(m, i);
1119                 if (!mn){
1120                         printf("object idx [%llu] not found!\n", i);
1121                         continue;
1122                 }
1123                 print_obj(mn);
1124         }
1125 }
1126
1127 void test_map(struct peerd *peer)
1128 {
1129         int i,j, ret;
1130         struct sha256_ctx sha256ctx;
1131         unsigned char buf[SHA256_DIGEST_SIZE];
1132         char buf_new[XSEG_MAX_TARGET_LEN + 20];
1133         struct map *m = malloc(sizeof(struct map));
1134         strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGET_LEN + 1);
1135         m->volume[XSEG_MAX_TARGET_LEN] = 0;
1136         strncpy(buf_new, m->volume, XSEG_MAX_TARGET_LEN);
1137         buf_new[XSEG_MAX_TARGET_LEN + 19] = 0;
1138         m->volumelen = XSEG_MAX_TARGET_LEN;
1139         m->size = 100*block_size;
1140         m->objects = xhash_new(3, INTEGER);
1141         struct map_node *map_node = calloc(100, sizeof(struct map_node));
1142         for (i = 0; i < 100; i++) {
1143                 sprintf(buf_new +XSEG_MAX_TARGET_LEN, "%u", i);
1144                 sha256_init_ctx(&sha256ctx);
1145                 sha256_process_bytes(buf_new, strlen(buf_new), &sha256ctx);
1146                 sha256_finish_ctx(&sha256ctx, buf);
1147                 for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
1148                         sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
1149                 }
1150                 map_node[i].objectidx = i;
1151                 map_node[i].objectlen = XSEG_MAX_TARGET_LEN;
1152                 map_node[i].flags = MF_OBJECT_EXIST;
1153                 ret = insert_object(m, &map_node[i]);
1154         }
1155
1156         char *data = malloc(block_size);
1157         mapheader_to_map(m, data);
1158         uint64_t pos = mapheader_size;
1159
1160         for (i = 0; i < 100; i++) {
1161                 map_node = find_object(m, i);
1162                 if (!map_node){
1163                         printf("no object node %d \n", i);
1164                         exit(1);
1165                 }
1166                 object_to_map(data+pos, map_node);
1167                 pos += objectsize_in_map;
1168         }
1169 //      print_map(m);
1170
1171         struct map *m2 = malloc(sizeof(struct map));
1172         strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGET_LEN +1);
1173         m->volume[XSEG_MAX_TARGET_LEN] = 0;
1174         m->volumelen = XSEG_MAX_TARGET_LEN;
1175
1176         m2->objects = xhash_new(3, INTEGER);
1177         ret = read_map(peer, m2, data);
1178 //      print_map(m2);
1179
1180         int fd = open(m->volume, O_CREAT|O_WRONLY);
1181         ssize_t r, sum = 0;
1182         while (sum < block_size) {
1183                 r = write(fd, data + sum, block_size -sum);
1184                 if (r < 0){
1185                         perror("write");
1186                         printf("write error\n");
1187                         exit(1);
1188                 } 
1189                 sum += r;
1190         }
1191         close(fd);
1192         map_node = find_object(m, 0);
1193         free(map_node);
1194         free(m);
1195 }
1196