Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / mt-mapperd.c @ 2979e589

History | View | Annotate | Download (55.9 kB)

1
#include <stdio.h>
2
#include <unistd.h>
3
#include <sys/types.h>
4
#include <pthread.h>
5
#include <xseg/xseg.h>
6
#include <peer.h>
7
#include <time.h>
8
#include <xtypes/xlock.h>
9
#include <xtypes/xhash.h>
10
#include <xseg/protocol.h>
11
#include <sys/stat.h>
12
#include <fcntl.h>
13
#include <gcrypt.h>
14
#include <errno.h>
15
#include <sched.h>
16
#include <sys/syscall.h>
17

    
18
GCRY_THREAD_OPTION_PTHREAD_IMPL;
19
#define MF_LOAD         (1 << 0)
20
#define MF_EXCLUSIVE         (1 << 1)
21
#define MF_FORCE         (1 << 2)
22

    
23
#define SHA256_DIGEST_SIZE 32
24
/* hex representation of sha256 value takes up double the sha256 size */
25
#define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
26

    
27
#define block_size (1<<22) //FIXME this should be defined here?
28
#define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
29
#define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value  + volume size */
30

    
31
#define MF_OBJECT_EXIST                (1 << 0)
32
#define MF_OBJECT_COPYING        (1 << 1)
33
#define MF_OBJECT_WRITING        (1 << 2)
34
#define MF_OBJECT_DELETING        (1 << 3)
35
#define MF_OBJECT_DELETED        (1 << 4)
36
#define MF_OBJECT_DESTROYED        (1 << 5)
37

    
38
#define MF_OBJECT_NOT_READY        (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
39

    
40
char *magic_string = "This a magic string. Please hash me";
41
unsigned char magic_sha256[SHA256_DIGEST_SIZE];        /* sha256 hash value of magic string */
42
char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
43

    
44
//dispatch_internal mapper states
45
enum mapper_state {
46
        ACCEPTED = 0,
47
        WRITING = 1,
48
        COPYING = 2,
49
        DELETING = 3,
50
        DROPPING_CACHE = 4
51
};
52

    
53
typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
54

    
55
struct map_node {
56
        uint32_t flags;
57
        uint32_t objectidx;
58
        uint32_t objectlen;
59
        char object[XSEG_MAX_TARGETLEN + 1];         /* NULL terminated string */
60
        struct map *map;
61
        uint32_t ref;
62
        uint32_t waiters;
63
        st_cond_t cond;
64
};
65

    
66
#define MF_MAP_LOADING                (1 << 0)
67
#define MF_MAP_DESTROYED        (1 << 1)
68
#define MF_MAP_WRITING                (1 << 2)
69
#define MF_MAP_DELETING                (1 << 3)
70
#define MF_MAP_DROPPING_CACHE        (1 << 4)
71
#define MF_MAP_EXCLUSIVE        (1 << 5)
72
#define MF_MAP_OPENING                (1 << 6)
73
#define MF_MAP_CLOSING                (1 << 7)
74

    
75
#define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
76
                                        MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
77

    
78
#define wait_on_pr(__pr, __condition__)         \
79
        while (__condition__){                        \
80
                ta--;                                \
81
                __get_mapper_io(pr)->active = 0;\
82
                XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
83
                st_cond_wait(__pr->cond);        \
84
        }
85

    
86
#define wait_on_mapnode(__mn, __condition__)        \
87
        while (__condition__){                        \
88
                ta--;                                \
89
                __mn->waiters++;                \
90
                XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
91
                st_cond_wait(__mn->cond);        \
92
        }
93

    
94
#define wait_on_map(__map, __condition__)        \
95
        while (__condition__){                        \
96
                ta--;                                \
97
                __map->waiters++;                \
98
                XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
99
                st_cond_wait(__map->cond);        \
100
        }
101

    
102
#define signal_pr(__pr)                                \
103
        do {                                         \
104
                if (!__get_mapper_io(pr)->active){\
105
                        ta++;                        \
106
                        XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta); \
107
                        __get_mapper_io(pr)->active = 1;\
108
                        st_cond_signal(__pr->cond);        \
109
                }                                \
110
        }while(0)
111

    
112
#define signal_map(__map)                        \
113
        do {                                         \
114
                if (__map->waiters) {                \
115
                        ta += 1;                \
116
                        XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
117
                        __map->waiters--;        \
118
                        st_cond_signal(__map->cond);        \
119
                }                                \
120
        }while(0)
121

    
122
#define signal_mapnode(__mn)                        \
123
        do {                                         \
124
                if (__mn->waiters) {                \
125
                        ta += __mn->waiters;        \
126
                        XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
127
                        __mn->waiters = 0;        \
128
                        st_cond_broadcast(__mn->cond);        \
129
                }                                \
130
        }while(0)
131

    
132

    
133
struct map {
134
        uint32_t flags;
135
        uint64_t size;
136
        uint32_t volumelen;
137
        char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
138
        xhash_t *objects;         /* obj_index --> map_node */
139
        uint32_t ref;
140
        uint32_t waiters;
141
        st_cond_t cond;
142
};
143

    
144
struct mapperd {
145
        xport bportno;                /* blocker that accesses data */
146
        xport mbportno;                /* blocker that accesses maps */
147
        xhash_t *hashmaps; // hash_function(target) --> struct map
148
};
149

    
150
struct mapper_io {
151
        volatile uint32_t copyups;        /* nr of copyups pending, issued by this mapper io */
152
        xhash_t *copyups_nodes;                /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
153
        struct map_node *copyup_node;
154
        volatile int err;                        /* error flag */
155
        volatile uint64_t del_pending;
156
        uint64_t delobj;
157
        uint64_t dcobj;
158
        cb_t cb;
159
        enum mapper_state state;
160
        volatile int active;
161
};
162

    
163
/* global vars */
164
struct mapperd *mapper;
165

    
166
void print_map(struct map *m);
167

    
168
/*
169
 * Helper functions
170
 */
171

    
172
static inline struct mapperd * __get_mapperd(struct peerd *peer)
173
{
174
        return (struct mapperd *) peer->priv;
175
}
176

    
177
static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
178
{
179
        return (struct mapper_io *) pr->priv;
180
}
181

    
182
static inline uint64_t calc_map_obj(struct map *map)
183
{
184
        if (map->size == -1)
185
                return 0;
186
        uint64_t nr_objs = map->size / block_size;
187
        if (map->size % block_size)
188
                nr_objs++;
189
        return nr_objs;
190
}
191

    
192
static uint32_t calc_nr_obj(struct xseg_request *req)
193
{
194
        unsigned int r = 1;
195
        uint64_t rem_size = req->size;
196
        uint64_t obj_offset = req->offset & (block_size -1); //modulo
197
        uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
198
        rem_size -= obj_size;
199
        while (rem_size > 0) {
200
                obj_size = (rem_size > block_size) ? block_size : rem_size;
201
                rem_size -= obj_size;
202
                r++;
203
        }
204

    
205
        return r;
206
}
207

    
208
/*
209
 * Maps handling functions
210
 */
211
//FIXME assert targetlen > 0
212

    
213

    
214
static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
215
{
216
        int r;
217
        struct map *m = NULL;
218
        char buf[XSEG_MAX_TARGETLEN+1];
219
        //assert targetlen <= XSEG_MAX_TARGETLEN
220
        strncpy(buf, target, targetlen);
221
        buf[targetlen] = 0;
222
        XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
223
        r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
224
        if (r < 0)
225
                return NULL;
226
        return m;
227
}
228

    
229

    
230
static int insert_map(struct mapperd *mapper, struct map *map)
231
{
232
        int r = -1;
233
        
234
        if (find_map(mapper, map->volume, map->volumelen)){
235
                XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
236
                goto out;
237
        }
238

    
239
        XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
240
                        map->volume, strlen(map->volume), (unsigned long) map);
241
        r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
242
        while (r == -XHASH_ERESIZE) {
243
                xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
244
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
245
                if (!new_hashmap){
246
                        XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
247
                                        (unsigned long long) shift);
248
                        goto out;
249
                }
250
                mapper->hashmaps = new_hashmap;
251
                r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
252
        }
253
out:
254
        return r;
255
}
256

    
257
static int remove_map(struct mapperd *mapper, struct map *map)
258
{
259
        int r = -1;
260
        
261
        //assert no pending pr on map
262
        
263
        r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
264
        while (r == -XHASH_ERESIZE) {
265
                xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
266
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
267
                if (!new_hashmap){
268
                        XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
269
                                        (unsigned long long) shift);
270
                        goto out;
271
                }
272
                mapper->hashmaps = new_hashmap;
273
                r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
274
        }
275
out:
276
        return r;
277
}
278

    
279
static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
280
{
281
        int r;
282
        xport p;
283
        struct peerd *peer = pr->peer;
284
        struct xseg_request *req;
285
        struct mapperd *mapper = __get_mapperd(peer);
286
        void *dummy;
287

    
288
        XSEGLOG2(&lc, I, "Closing map %s", map->volume);
289

    
290
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
291
        if (!req){
292
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
293
                                map->volume);
294
                goto out_err;
295
        }
296

    
297
        r = xseg_prep_request(peer->xseg, req, map->volumelen, block_size);
298
        if (r < 0){
299
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
300
                                map->volume);
301
                goto out_put;
302
        }
303

    
304
        char *reqtarget = xseg_get_target(peer->xseg, req);
305
        if (!reqtarget)
306
                goto out_put;
307
        strncpy(reqtarget, map->volume, req->targetlen);
308
        req->op = X_CLOSE;
309
        req->size = block_size;
310
        req->offset = 0;
311
        r = xseg_set_req_data(peer->xseg, req, pr);
312
        if (r < 0){
313
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
314
                                map->volume);
315
                goto out_put;
316
        }
317
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
318
        if (p == NoPort){ 
319
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
320
                                map->volume);
321
                goto out_unset;
322
        }
323
        r = xseg_signal(peer->xseg, p);
324
        
325
        XSEGLOG2(&lc, I, "Map %s closing", map->volume);
326
        return req;
327

    
328
out_unset:
329
        xseg_get_req_data(peer->xseg, req, &dummy);
330
out_put:
331
        xseg_put_request(peer->xseg, req, pr->portno);
332
out_err:
333
        return NULL;
334
}
335

    
336
static int close_map(struct peer_req *pr, struct map *map)
337
{
338
        int err;
339
        struct xseg_request *req;
340
        struct peerd *peer = pr->peer;
341

    
342
        map->flags |= MF_MAP_CLOSING;
343
        req = __close_map(pr, map);
344
        if (!req)
345
                return -1;
346
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
347
        map->flags &= ~MF_MAP_CLOSING;
348
        err = req->state & XS_FAILED;
349
        xseg_put_request(peer->xseg, req, pr->portno);
350
        if (err)
351
                return -1;
352
        return 0;
353
}
354

    
355
/*
356
static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
357
                                char *target, uint32_t targetlen, struct map **m)
358
{
359
        struct mapperd *mapper = __get_mapperd(peer);
360
        int r;
361
        *m = find_map(mapper, target, targetlen);
362
        if (*m) {
363
                XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
364
                if ((*m)->flags & MF_MAP_NOT_READY) {
365
                        __xq_append_tail(&(*m)->pending, (xqindex) pr);
366
                        XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
367
                        return MF_PENDING;
368
                //} else if ((*m)->flags & MF_MAP_DESTROYED){
369
                //        return -1;
370
                // 
371
                }else {
372
                        XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
373
                        return 0;
374
                }
375
        }
376
        r = open_map(peer, pr, target, targetlen, 0);
377
        if (r < 0)
378
                return -1; //error
379
        return MF_PENDING;        
380
}
381
*/
382
/*
383
 * Object handling functions
384
 */
385

    
386
struct map_node *find_object(struct map *map, uint64_t obj_index)
387
{
388
        struct map_node *mn;
389
        int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
390
        if (r < 0)
391
                return NULL;
392
        return mn;
393
}
394

    
395
static int insert_object(struct map *map, struct map_node *mn)
396
{
397
        //FIXME no find object first
398
        int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
399
        if (r == -XHASH_ERESIZE) {
400
                unsigned long shift = xhash_grow_size_shift(map->objects);
401
                map->objects = xhash_resize(map->objects, shift, NULL);
402
                if (!map->objects)
403
                        return -1;
404
                r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
405
        }
406
        return r;
407
}
408

    
409

    
410
/*
411
 * map read/write functions 
412
 */
413
static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
414
{
415
        int i;
416
        //hexlify sha256 value
417
        for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
418
                sprintf(mn->object+2*i, "%02x", buf[i]);
419
        }
420

    
421
        mn->object[SHA256_DIGEST_SIZE * 2] = 0;
422
        mn->objectlen = SHA256_DIGEST_SIZE * 2;
423
        mn->flags = MF_OBJECT_EXIST;
424
}
425

    
426
static inline void map_to_object(struct map_node *mn, char *buf)
427
{
428
        char c = buf[0];
429
        mn->flags = 0;
430
        if (c)
431
                mn->flags |= MF_OBJECT_EXIST;
432
        memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
433
        mn->object[XSEG_MAX_TARGETLEN] = 0;
434
        mn->objectlen = strlen(mn->object);
435
}
436

    
437
static inline void object_to_map(char* buf, struct map_node *mn)
438
{
439
        buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
440
        memcpy(buf+1, mn->object, mn->objectlen);
441
        memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
442
}
443

    
444
static inline void mapheader_to_map(struct map *m, char *buf)
445
{
446
        uint64_t pos = 0;
447
        memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
448
        pos += SHA256_DIGEST_SIZE;
449
        memcpy(buf + pos, &m->size, sizeof(m->size));
450
        pos += sizeof(m->size);
451
}
452

    
453

    
454
static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr, 
455
                                struct map *map, struct map_node *mn)
456
{
457
        void *dummy;
458
        struct mapperd *mapper = __get_mapperd(peer);
459
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
460
                                                        mapper->mbportno, X_ALLOC);
461
        if (!req){
462
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
463
                                "(Map: %s [%llu]",
464
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
465
                goto out_err;
466
        }
467
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
468
        if (r < 0){
469
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
470
                                "(Map: %s [%llu]",
471
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
472
                goto out_put;
473
        }
474
        char *target = xseg_get_target(peer->xseg, req);
475
        strncpy(target, map->volume, req->targetlen);
476
        req->size = objectsize_in_map;
477
        req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
478
        req->op = X_WRITE;
479
        char *data = xseg_get_data(peer->xseg, req);
480
        object_to_map(data, mn);
481

    
482
        r = xseg_set_req_data(peer->xseg, req, pr);
483
        if (r < 0){
484
                XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
485
                                "(Map: %s [%llu]",
486
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
487
                goto out_put;
488
        }
489
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
490
        if (p == NoPort){
491
                XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
492
                                "(Map: %s [%llu]",
493
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
494
                goto out_unset;
495
        }
496
        r = xseg_signal(peer->xseg, p);
497
        if (r < 0)
498
                XSEGLOG2(&lc, W, "Cannot signal port %u", p);
499

    
500
        XSEGLOG2(&lc, I, "Writing object %s \n\t"
501
                        "Map: %s [%llu]",
502
                        mn->object, map->volume, (unsigned long long) mn->objectidx);
503

    
504
        return req;
505

    
506
out_unset:
507
        xseg_get_req_data(peer->xseg, req, &dummy);
508
out_put:
509
        xseg_put_request(peer->xseg, req, pr->portno);
510
out_err:
511
        XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
512
                        "(Map: %s [%llu]",
513
                        mn->object, map->volume, (unsigned long long) mn->objectidx);
514
        return NULL;
515
}
516

    
517
static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
518
{
519
        void *dummy;
520
        struct peerd *peer = pr->peer;
521
        struct mapperd *mapper = __get_mapperd(peer);
522
        struct map_node *mn;
523
        uint64_t i, pos, max_objidx = calc_map_obj(map);
524
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
525
                                                        mapper->mbportno, X_ALLOC);
526
        if (!req){
527
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
528
                goto out_err;
529
        }
530
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, 
531
                                        mapheader_size + max_objidx * objectsize_in_map);
532
        if (r < 0){
533
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
534
                goto out_put;
535
        }
536
        char *target = xseg_get_target(peer->xseg, req);
537
        strncpy(target, map->volume, req->targetlen);
538
        char *data = xseg_get_data(peer->xseg, req);
539
        mapheader_to_map(map, data);
540
        pos = mapheader_size;
541
        req->op = X_WRITE;
542
        req->size = req->datalen;
543
        req->offset = 0;
544

    
545
        if (map->size % block_size)
546
                max_objidx++;
547
        for (i = 0; i < max_objidx; i++) {
548
                mn = find_object(map, i);
549
                if (!mn){
550
                        XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
551
                                        (unsigned long long) i, map->volume);
552
                        goto out_put;
553
                }
554
                object_to_map(data+pos, mn);
555
                pos += objectsize_in_map;
556
        }
557
        r = xseg_set_req_data(peer->xseg, req, pr);
558
        if (r < 0){
559
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
560
                                map->volume);
561
                goto out_put;
562
        }
563
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
564
        if (p == NoPort){
565
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
566
                                map->volume);
567
                goto out_unset;
568
        }
569
        r = xseg_signal(peer->xseg, p);
570
        if (r < 0)
571
                XSEGLOG2(&lc, W, "Cannot signal port %u", p);
572

    
573
        map->flags |= MF_MAP_WRITING;
574
        XSEGLOG2(&lc, I, "Writing map %s", map->volume);
575
        return req;
576

    
577
out_unset:
578
        xseg_get_req_data(peer->xseg, req, &dummy);
579
out_put:
580
        xseg_put_request(peer->xseg, req, pr->portno);
581
out_err:
582
        XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
583
        return NULL;
584
}
585

    
586
static int write_map(struct peer_req* pr, struct map *map)
587
{
588
        int r = 0;
589
        struct peerd *peer = pr->peer;
590
        map->flags |= MF_MAP_WRITING;
591
        struct xseg_request *req = __write_map(pr, map);
592
        if (!req)
593
                return -1;
594
        wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
595
        if (req->state & XS_FAILED)
596
                r = -1;
597
        xseg_put_request(peer->xseg, req, pr->portno);
598
        map->flags &= ~MF_MAP_WRITING;
599
        return r;
600
}
601

    
602
static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
603
{
604
        int r;
605
        xport p;
606
        struct xseg_request *req;
607
        struct peerd *peer = pr->peer;
608
        struct mapperd *mapper = __get_mapperd(peer);
609
        void *dummy;
610

    
611
        XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
612

    
613
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
614
        if (!req){
615
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
616
                                m->volume);
617
                goto out_fail;
618
        }
619

    
620
        r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
621
        if (r < 0){
622
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
623
                                m->volume);
624
                goto out_put;
625
        }
626

    
627
        char *reqtarget = xseg_get_target(peer->xseg, req);
628
        if (!reqtarget)
629
                goto out_put;
630
        strncpy(reqtarget, m->volume, req->targetlen);
631
        req->op = X_READ;
632
        req->size = block_size;
633
        req->offset = 0;
634
        r = xseg_set_req_data(peer->xseg, req, pr);
635
        if (r < 0){
636
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
637
                                m->volume);
638
                goto out_put;
639
        }
640
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
641
        if (p == NoPort){ 
642
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
643
                                m->volume);
644
                goto out_unset;
645
        }
646
        r = xseg_signal(peer->xseg, p);
647
        
648
        XSEGLOG2(&lc, I, "Map %s loading", m->volume);
649
        return req;
650

    
651
out_unset:
652
        xseg_get_req_data(peer->xseg, req, &dummy);
653
out_put:
654
        xseg_put_request(peer->xseg, req, pr->portno);
655
out_fail:
656
        return NULL;
657
}
658

    
659
static int read_map (struct map *map, char *buf)
660
{
661
        char nulls[SHA256_DIGEST_SIZE];
662
        memset(nulls, 0, SHA256_DIGEST_SIZE);
663

    
664
        int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
665
        if (r) {
666
                XSEGLOG2(&lc, D, "Read zeros");
667
                //read error;
668
                return -1;
669
        }
670
        //type 1, our type, type 0 pithos map
671
        int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
672
        XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
673
        uint64_t pos;
674
        uint64_t i, nr_objs;
675
        struct map_node *map_node;
676
        if (type) {
677
                pos = SHA256_DIGEST_SIZE;
678
                map->size = *(uint64_t *) (buf + pos);
679
                pos += sizeof(uint64_t);
680
                nr_objs = map->size / block_size;
681
                if (map->size % block_size)
682
                        nr_objs++;
683
                map_node = calloc(nr_objs, sizeof(struct map_node));
684
                if (!map_node)
685
                        return -1;
686

    
687
                for (i = 0; i < nr_objs; i++) {
688
                        map_node[i].map = map;
689
                        map_node[i].objectidx = i;
690
                        map_node[i].waiters = 0;
691
                        map_node[i].ref = 1;
692
                        map_node[i].cond = st_cond_new(); //FIXME err check;
693
                        map_to_object(&map_node[i], buf + pos);
694
                        pos += objectsize_in_map;
695
                        r = insert_object(map, &map_node[i]); //FIXME error check
696
                }
697
        } else {
698
                pos = 0;
699
                uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
700
                map_node = calloc(max_nr_objs, sizeof(struct map_node));
701
                if (!map_node)
702
                        return -1;
703
                for (i = 0; i < max_nr_objs; i++) {
704
                        if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
705
                                break;
706
                        map_node[i].objectidx = i;
707
                        map_node[i].map = map;
708
                        map_node[i].waiters = 0;
709
                        map_node[i].ref = 1;
710
                        map_node[i].cond = st_cond_new(); //FIXME err check;
711
                        pithosmap_to_object(&map_node[i], buf + pos);
712
                        pos += SHA256_DIGEST_SIZE; 
713
                        r = insert_object(map, &map_node[i]); //FIXME error check
714
                }
715
                map->size = i * block_size; 
716
        }
717
        print_map(map);
718
        XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
719
        return 0;
720

    
721
        //FIXME cleanup on error
722
}
723

    
724
static int load_map(struct peer_req *pr, struct map *map)
725
{
726
        int r = 0;
727
        struct xseg_request *req;
728
        struct peerd *peer = pr->peer;
729
        map->flags |= MF_MAP_LOADING;
730
        req = __load_map(pr, map);
731
        if (!req)
732
                return -1;
733
        wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
734
        map->flags &= ~MF_MAP_LOADING;
735
        if (req->state & XS_FAILED){
736
                XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
737
                xseg_put_request(peer->xseg, req, pr->portno);
738
                return -1;
739
        }
740
        r = read_map(map, xseg_get_data(peer->xseg, req));
741
        xseg_put_request(peer->xseg, req, pr->portno);
742
        return r;
743
}
744

    
745
static struct xseg_request * __open_map(struct peer_req *pr, struct map *m)
746
{
747
        int r;
748
        xport p;
749
        struct xseg_request *req;
750
        struct peerd *peer = pr->peer;
751
        struct mapperd *mapper = __get_mapperd(peer);
752
        void *dummy;
753

    
754
        XSEGLOG2(&lc, I, "Opening map %s", m->volume);
755

    
756
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
757
        if (!req){
758
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
759
                                m->volume);
760
                goto out_fail;
761
        }
762

    
763
        r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
764
        if (r < 0){
765
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
766
                                m->volume);
767
                goto out_put;
768
        }
769

    
770
        char *reqtarget = xseg_get_target(peer->xseg, req);
771
        if (!reqtarget)
772
                goto out_put;
773
        strncpy(reqtarget, m->volume, req->targetlen);
774
        req->op = X_OPEN;
775
        req->size = block_size;
776
        req->offset = 0;
777
        r = xseg_set_req_data(peer->xseg, req, pr);
778
        if (r < 0){
779
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
780
                                m->volume);
781
                goto out_put;
782
        }
783
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
784
        if (p == NoPort){ 
785
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
786
                                m->volume);
787
                goto out_unset;
788
        }
789
        r = xseg_signal(peer->xseg, p);
790
        
791
        XSEGLOG2(&lc, I, "Map %s opening", m->volume);
792
        return req;
793

    
794
out_unset:
795
        xseg_get_req_data(peer->xseg, req, &dummy);
796
out_put:
797
        xseg_put_request(peer->xseg, req, pr->portno);
798
out_fail:
799
        return NULL;
800
}
801

    
802
static int open_map(struct peer_req *pr, struct map *map)
803
{
804
        int err;
805
        struct xseg_request *req;
806
        struct peerd *peer = pr->peer;
807

    
808
        map->flags |= MF_MAP_OPENING;
809
        req = __open_map(pr, map);
810
        if (!req)
811
                return -1;
812
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
813
        map->flags &= ~MF_MAP_OPENING;
814
        err = req->state & XS_FAILED;
815
        xseg_put_request(peer->xseg, req, pr->portno);
816
        if (err)
817
                return -1;
818
        else 
819
                map->flags |= MF_MAP_EXCLUSIVE;
820
        return 0;
821
}
822

    
823
/*
824
 * copy up functions
825
 */
826

    
827
static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
828
{
829
        int r = 0;
830
        if (mn){
831
                r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
832
                if (r == -XHASH_ERESIZE) {
833
                        xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
834
                        xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
835
                        if (!new_hashmap)
836
                                goto out;
837
                        mio->copyups_nodes = new_hashmap;
838
                        r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
839
                }
840
        }
841
        else {
842
                r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
843
                if (r == -XHASH_ERESIZE) {
844
                        xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
845
                        xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
846
                        if (!new_hashmap)
847
                                goto out;
848
                        mio->copyups_nodes = new_hashmap;
849
                        r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
850
                }
851
        }
852
out:
853
        return r;
854
}
855

    
856
static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
857
{
858
        struct map_node *mn;
859
        int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
860
        if (r < 0)
861
                return NULL;
862
        return mn;
863
}
864

    
865
static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
866
{
867
        struct mapperd *mapper = __get_mapperd(peer);
868
        struct mapper_io *mio = __get_mapper_io(pr);
869
        struct map *map = mn->map;
870
        void *dummy;
871
        int r = -1, i;
872
        xport p;
873

    
874
        //struct sha256_ctx sha256ctx;
875
        uint32_t newtargetlen;
876
        char new_target[XSEG_MAX_TARGETLEN + 1]; 
877
        unsigned char buf[SHA256_DIGEST_SIZE];        //assert sha256_digest_size(32) <= MAXTARGETLEN 
878
        char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
879
        strncpy(new_object, map->volume, map->volumelen);
880
        sprintf(new_object + map->volumelen, "%u", mn->objectidx); //sprintf adds null termination
881
        new_object[XSEG_MAX_TARGETLEN + 19] = 0;
882

    
883
        gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
884
        for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
885
                sprintf (new_target + 2*i, "%02x", buf[i]);
886
        newtargetlen = SHA256_DIGEST_SIZE  * 2;
887

    
888
        if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE)) 
889
                goto copyup_zeroblock;
890

    
891
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
892
                                                        mapper->bportno, X_ALLOC);
893
        if (!req){
894
                XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
895
                goto out_err;
896
        }
897
        r = xseg_prep_request(peer->xseg, req, newtargetlen, 
898
                                sizeof(struct xseg_request_copy));
899
        if (r < 0){
900
                XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
901
                goto out_put;
902
        }
903

    
904
        char *target = xseg_get_target(peer->xseg, req);
905
        strncpy(target, new_target, req->targetlen);
906

    
907
        struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
908
        strncpy(xcopy->target, mn->object, mn->objectlen);
909
        xcopy->targetlen = mn->objectlen;
910

    
911
        req->offset = 0;
912
        req->size = block_size;
913
        req->op = X_COPY;
914
        r = xseg_set_req_data(peer->xseg, req, pr);
915
        if (r<0){
916
                XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
917
                goto out_put;
918
        }
919
        r = __set_copyup_node(mio, req, mn);
920
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
921
        if (p == NoPort) {
922
                XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
923
                goto out_unset;
924
        }
925
        xseg_signal(peer->xseg, p);
926
//        mio->copyups++;
927

    
928
        mn->flags |= MF_OBJECT_COPYING;
929
        XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
930
        return req;
931

    
932
out_unset:
933
        r = __set_copyup_node(mio, req, NULL);
934
        xseg_get_req_data(peer->xseg, req, &dummy);
935
out_put:
936
        xseg_put_request(peer->xseg, req, pr->portno);
937
out_err:
938
        XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
939
        return NULL;
940

    
941
copyup_zeroblock:
942
        XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
943
                        "Proceeding in writing the new object in map");
944
        /* construct a tmp map_node for writing purposes */
945
        struct map_node newmn = *mn;
946
        newmn.flags = MF_OBJECT_EXIST;
947
        strncpy(newmn.object, new_target, newtargetlen);
948
        newmn.object[newtargetlen] = 0;
949
        newmn.objectlen = newtargetlen;
950
        newmn.objectidx = mn->objectidx; 
951
        req = object_write(peer, pr, map, &newmn);
952
        r = __set_copyup_node(mio, req, mn);
953
        if (!req){
954
                XSEGLOG2(&lc, E, "Object write returned error for object %s"
955
                                "\n\t of map %s [%llu]",
956
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
957
                return NULL;
958
        }
959
        mn->flags |= MF_OBJECT_WRITING;
960
        XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
961
        return req;
962
}
963

    
964
static struct xseg_request * delete_object(struct peer_req *pr, struct map_node *mn)
965
{
966
        void *dummy;
967
        struct peerd *peer = pr->peer;
968
        struct mapperd *mapper = __get_mapperd(peer);
969
        struct mapper_io *mio = __get_mapper_io(pr);
970
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
971
                                                        mapper->bportno, X_ALLOC);
972
        XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
973
        if (!req){
974
                XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
975
                goto out_err;
976
        }
977
        int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
978
        if (r < 0){
979
                XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
980
                goto out_put;
981
        }
982
        char *target = xseg_get_target(peer->xseg, req);
983
        strncpy(target, mn->object, req->targetlen);
984
        req->op = X_DELETE;
985
        req->size = req->datalen;
986
        req->offset = 0;
987
        r = xseg_set_req_data(peer->xseg, req, pr);
988
        if (r < 0){
989
                XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
990
                goto out_put;
991
        }
992
        __set_copyup_node(mio, req, mn);
993
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
994
        if (p == NoPort){
995
                XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
996
                goto out_unset;
997
        }
998
        r = xseg_signal(peer->xseg, p);
999
        XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1000
        return req;
1001

    
1002
out_unset:
1003
        xseg_get_req_data(peer->xseg, req, &dummy);
1004
out_put:
1005
        xseg_put_request(peer->xseg, req, pr->portno);
1006
out_err:
1007
        XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1008
        return NULL;
1009
}
1010

    
1011
static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
1012
{
1013
        void *dummy;
1014
        struct peerd *peer = pr->peer;
1015
        struct mapperd *mapper = __get_mapperd(peer);
1016
        struct mapper_io *mio = __get_mapper_io(pr);
1017
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1018
                                                        mapper->mbportno, X_ALLOC);
1019
        XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1020
        if (!req){
1021
                XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1022
                goto out_err;
1023
        }
1024
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1025
        if (r < 0){
1026
                XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1027
                goto out_put;
1028
        }
1029
        char *target = xseg_get_target(peer->xseg, req);
1030
        strncpy(target, map->volume, req->targetlen);
1031
        req->op = X_DELETE;
1032
        req->size = req->datalen;
1033
        req->offset = 0;
1034
        r = xseg_set_req_data(peer->xseg, req, pr);
1035
        if (r < 0){
1036
                XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1037
                goto out_put;
1038
        }
1039
        __set_copyup_node(mio, req, NULL);
1040
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1041
        if (p == NoPort){
1042
                XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1043
                goto out_unset;
1044
        }
1045
        r = xseg_signal(peer->xseg, p);
1046
        map->flags |= MF_MAP_DELETING;
1047
        XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1048
        return req;
1049

    
1050
out_unset:
1051
        xseg_get_req_data(peer->xseg, req, &dummy);
1052
out_put:
1053
        xseg_put_request(peer->xseg, req, pr->portno);
1054
out_err:
1055
        XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1056
        return  NULL;
1057
}
1058

    
1059

    
1060
static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1061
{
1062
        struct map_node *mn = find_object(map, index);
1063
        if (mn)
1064
                mn->ref++;
1065
        return mn;
1066
}
1067

    
1068
static inline void put_mapnode(struct map_node *mn)
1069
{
1070
        mn->ref--;
1071
        if (!mn->ref){
1072
                //clean up mn
1073
                st_cond_destroy(mn->cond);
1074
        }
1075
}
1076

    
1077
static inline void __get_map(struct map *map)
1078
{
1079
        map->ref++;
1080
}
1081

    
1082
static inline void put_map(struct map *map)
1083
{
1084
        struct map_node *mn;
1085
        map->ref--;
1086
        if (!map->ref){
1087
                XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1088
                //clean up map
1089
                uint64_t i;
1090
                for (i = 0; i < calc_map_obj(map); i++) {
1091
                        mn = get_mapnode(map, i);
1092
                        if (mn) {
1093
                                //make sure all pending operations on all objects are completed
1094
                                if (mn->flags & MF_OBJECT_NOT_READY){
1095
                                        //this should never happen...
1096
                                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1097
                                }
1098
                                mn->flags &= MF_OBJECT_DESTROYED;
1099
                                put_mapnode(mn); //matchin mn->ref = 1 on mn init
1100
                                put_mapnode(mn); //matcing get_mapnode;
1101
                                //assert mn->ref == 0;
1102
                        }
1103
                }
1104
                mn = find_object(map, 0);
1105
                if (mn)
1106
                        free(mn);
1107
                XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1108
                free(map);
1109
        }
1110
}
1111

    
1112
static struct map * create_map(struct mapperd *mapper, char *name, uint32_t namelen)
1113
{
1114
        int r;
1115
        struct map *m = malloc(sizeof(struct map));
1116
        if (!m){
1117
                XSEGLOG2(&lc, E, "Cannot allocate map ");
1118
                goto out_err;
1119
        }
1120
        m->size = -1;
1121
        strncpy(m->volume, name, namelen);
1122
        m->volume[namelen] = 0;
1123
        m->volumelen = namelen;
1124
        m->flags = 0;
1125
        m->objects = xhash_new(3, INTEGER); 
1126
        if (!m->objects){
1127
                XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1128
                                m->volume);
1129
                goto out_map;
1130
        }
1131
        m->ref = 1;
1132
        m->waiters = 0;
1133
        m->cond = st_cond_new(); //FIXME err check;
1134
        r = insert_map(mapper, m);
1135
        if (r < 0){
1136
                XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1137
                goto out_hash;
1138
        }
1139

    
1140
        return m;
1141

    
1142
out_hash:
1143
        xhash_free(m->objects);
1144
out_map:
1145
        XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1146
        free(m);
1147
out_err:
1148
        return NULL;
1149
}
1150

    
1151

    
1152

    
1153
void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1154
{
1155
        struct peerd *peer = pr->peer;
1156
        struct mapperd *mapper = __get_mapperd(peer);
1157
        (void)mapper;
1158
        struct mapper_io *mio = __get_mapper_io(pr);
1159
        struct map_node *mn = __get_copyup_node(mio, req);
1160

    
1161
        mio->del_pending--;
1162
        if (req->state & XS_FAILED){
1163
                mio->err = 1;
1164
        }
1165
        signal_mapnode(mn);
1166
        xseg_put_request(peer->xseg, req, pr->portno);
1167
        signal_pr(pr);
1168
}
1169

    
1170
void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1171
{
1172
        struct peerd *peer = pr->peer;
1173
        struct mapperd *mapper = __get_mapperd(peer);
1174
        (void)mapper;
1175
        struct mapper_io *mio = __get_mapper_io(pr);
1176
        struct map_node *mn = __get_copyup_node(mio, req);
1177
        if (!mn){
1178
                XSEGLOG2(&lc, E, "Cannot get map node");
1179
                goto out_err;
1180
        }
1181
        __set_copyup_node(mio, req, NULL);
1182

    
1183
        if (req->state & XS_FAILED){
1184
                XSEGLOG2(&lc, E, "Req failed");
1185
                mn->flags &= ~MF_OBJECT_COPYING;
1186
                mn->flags &= ~MF_OBJECT_WRITING;
1187
                goto out_err;
1188
        }
1189
        if (req->op == X_WRITE) {
1190
                char *target = xseg_get_target(peer->xseg, req);
1191
                (void)target;
1192
                //printf("handle object write replyi\n");
1193
                __set_copyup_node(mio, req, NULL);
1194
                //assert mn->flags & MF_OBJECT_WRITING
1195
                mn->flags &= ~MF_OBJECT_WRITING;
1196
                
1197
                struct map_node tmp;
1198
                char *data = xseg_get_data(peer->xseg, req);
1199
                map_to_object(&tmp, data);
1200
                mn->flags |= MF_OBJECT_EXIST;
1201
                if (mn->flags != MF_OBJECT_EXIST){
1202
                        XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1203
                        goto out_err;
1204
                }
1205
                //assert mn->flags & MF_OBJECT_EXIST
1206
                strncpy(mn->object, tmp.object, tmp.objectlen);
1207
                mn->object[tmp.objectlen] = 0;
1208
                mn->objectlen = tmp.objectlen;
1209
                XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1210
                mio->copyups--;
1211
                signal_mapnode(mn);
1212
        } else if (req->op == X_COPY) {
1213
        //        issue write_object;
1214
                mn->flags &= ~MF_OBJECT_COPYING;
1215
                struct map *map = mn->map;
1216
                if (!map){
1217
                        XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1218
                        goto out_err;
1219
                }
1220

    
1221
                /* construct a tmp map_node for writing purposes */
1222
                char *target = xseg_get_target(peer->xseg, req);
1223
                struct map_node newmn = *mn;
1224
                newmn.flags = MF_OBJECT_EXIST;
1225
                strncpy(newmn.object, target, req->targetlen);
1226
                newmn.object[req->targetlen] = 0;
1227
                newmn.objectlen = req->targetlen;
1228
                newmn.objectidx = mn->objectidx; 
1229
                struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1230
                if (!xreq){
1231
                        XSEGLOG2(&lc, E, "Object write returned error for object %s"
1232
                                        "\n\t of map %s [%llu]",
1233
                                        mn->object, map->volume, (unsigned long long) mn->objectidx);
1234
                        goto out_err;
1235
                }
1236
                mn->flags |= MF_OBJECT_WRITING;
1237
                __set_copyup_node (mio, xreq, mn);
1238

    
1239
                XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1240
        } else {
1241
                //wtf??
1242
                ;
1243
        }
1244

    
1245
out:
1246
        xseg_put_request(peer->xseg, req, pr->portno);
1247
        return;
1248

    
1249
out_err:
1250
        mio->copyups--;
1251
        XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1252
        mio->err = 1;
1253
        if (mn)
1254
                signal_mapnode(mn);
1255
        goto out;
1256

    
1257
}
1258

    
1259
struct r2o {
1260
        struct map_node *mn;
1261
        uint64_t offset;
1262
        uint64_t size;
1263
};
1264

    
1265
static int req2objs(struct peer_req *pr, struct map *map, int write)
1266
{
1267
        int r = 0;
1268
        struct peerd *peer = pr->peer;
1269
        struct mapper_io *mio = __get_mapper_io(pr);
1270
        char *target = xseg_get_target(peer->xseg, pr->req);
1271
        uint32_t nr_objs = calc_nr_obj(pr->req);
1272
        uint64_t size = sizeof(struct xseg_reply_map) + 
1273
                        nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1274
        uint32_t idx, i, ready;
1275
        uint64_t rem_size, obj_index, obj_offset, obj_size; 
1276
        struct map_node *mn;
1277
        mio->copyups = 0;
1278
        XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1279

    
1280
        /* get map_nodes of request */
1281
        struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1282
        if (!mns){
1283
                XSEGLOG2(&lc, E, "Cannot allocate mns");
1284
                return -1;
1285
        }
1286
        idx = 0;
1287
        rem_size = pr->req->size;
1288
        obj_index = pr->req->offset / block_size;
1289
        obj_offset = pr->req->offset & (block_size -1); //modulo
1290
        obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1291
        mn = get_mapnode(map, obj_index);
1292
        if (!mn) {
1293
                XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1294
                r = -1;
1295
                goto out;
1296
        }
1297
        mns[idx].mn = mn;
1298
        mns[idx].offset = obj_offset;
1299
        mns[idx].size = obj_size;
1300
        rem_size -= obj_size;
1301
        while (rem_size > 0) {
1302
                idx++;
1303
                obj_index++;
1304
                obj_offset = 0;
1305
                obj_size = (rem_size >  block_size) ? block_size : rem_size;
1306
                rem_size -= obj_size;
1307
                mn = get_mapnode(map, obj_index);
1308
                if (!mn) {
1309
                        XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1310
                        r = -1;
1311
                        goto out;
1312
                }
1313
                mns[idx].mn = mn;
1314
                mns[idx].offset = obj_offset;
1315
                mns[idx].size = obj_size;
1316
        }
1317
        if (write) {
1318
                ready = 0;
1319
                int can_wait = 0;
1320
                mio->cb=copyup_cb;
1321
                while (ready < (idx + 1)){
1322
                        ready = 0;
1323
                        for (i = 0; i < (idx+1); i++) {
1324
                                mn = mns[i].mn;
1325
                                //do copyups
1326
                                if (mn->flags & MF_OBJECT_NOT_READY) {
1327
                                        if (can_wait){
1328
                                                wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1329
                                                if (mn->flags & MF_OBJECT_DELETED){
1330
                                                        mio->err = 1;
1331
                                                }
1332
                                                if (mio->err){
1333
                                                        XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1334
                                                        if (!mio->copyups){
1335
                                                                r = -1;
1336
                                                                goto out;
1337
                                                        }
1338
                                                }
1339
                                        }
1340
                                }
1341
                                else if (!(mn->flags & MF_OBJECT_EXIST)) {
1342
                                        //calc new_target, copy up object
1343
                                        if (copyup_object(peer, mn, pr) == NULL){
1344
                                                XSEGLOG2(&lc, E, "Error in copy up object");
1345
                                        } else {
1346
                                                mio->copyups++;
1347
                                        }
1348
                                } else {
1349
                                        ready++;
1350
                                }
1351
                        }
1352
                        can_wait = 1;
1353
                }
1354
                /*
1355
pending_copyups:
1356
                while(mio->copyups > 0){
1357
                        mio->cb = copyup_cb;
1358
                        wait_on_pr(pr, 0);
1359
                        ta--;
1360
                        st_cond_wait(pr->cond);
1361
                }
1362
                */
1363
        }
1364

    
1365
        if (mio->err){
1366
                r = -1;
1367
                XSEGLOG2(&lc, E, "Mio->err");
1368
                goto out;
1369
        }
1370

    
1371
        /* resize request to fit reply */
1372
        char buf[XSEG_MAX_TARGETLEN];
1373
        strncpy(buf, target, pr->req->targetlen);
1374
        r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1375
        if (r < 0) {
1376
                XSEGLOG2(&lc, E, "Cannot resize request");
1377
                goto out;
1378
        }
1379
        target = xseg_get_target(peer->xseg, pr->req);
1380
        strncpy(target, buf, pr->req->targetlen);
1381

    
1382
        /* structure reply */
1383
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1384
        reply->cnt = nr_objs;
1385
        for (i = 0; i < (idx+1); i++) {
1386
                strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1387
                reply->segs[i].targetlen = mns[i].mn->objectlen;
1388
                reply->segs[i].offset = mns[i].offset;
1389
                reply->segs[i].size = mns[i].size;
1390
        }
1391
out:
1392
        for (i = 0; i < idx; i++) {
1393
                put_mapnode(mns[i].mn);
1394
        }
1395
        free(mns);
1396
        return r;
1397
}
1398

    
1399
static int do_dropcache(struct peer_req *pr, struct map *map)
1400
{
1401
        struct map_node *mn;
1402
        struct peerd *peer = pr->peer;
1403
        struct mapperd *mapper = __get_mapperd(peer);
1404
        uint64_t i;
1405
        XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1406
        map->flags |= MF_MAP_DROPPING_CACHE;
1407
        for (i = 0; i < calc_map_obj(map); i++) {
1408
                mn = get_mapnode(map, i);
1409
                if (mn) {
1410
                        if (!(mn->flags & MF_OBJECT_DESTROYED)){
1411
                                //make sure all pending operations on all objects are completed
1412
                                if (mn->flags & MF_OBJECT_NOT_READY){
1413
                                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1414
                                }
1415
                                mn->flags &= MF_OBJECT_DESTROYED;
1416
                        }
1417
                        put_mapnode(mn);
1418
                }
1419
        }
1420
        map->flags &= ~MF_MAP_DROPPING_CACHE;
1421
        map->flags |= MF_MAP_DESTROYED;
1422
        remove_map(mapper, map);
1423
        XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1424
        put_map(map);        // put map here to destroy it (matches m->ref = 1 on map create)
1425
        return 0;
1426
}
1427

    
1428
static int do_info(struct peer_req *pr, struct map *map)
1429
{
1430
        struct peerd *peer = pr->peer;
1431
        struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1432
        xinfo->size = map->size;
1433
        return 0;
1434
}
1435

    
1436

    
1437
static int do_close(struct peer_req *pr, struct map *map)
1438
{
1439
//        struct peerd *peer = pr->peer;
1440
//        struct xseg_request *req;
1441
        if (map->flags & MF_MAP_EXCLUSIVE) 
1442
                close_map(pr, map);
1443
        return do_dropcache(pr, map);
1444
}
1445

    
1446
static int do_destroy(struct peer_req *pr, struct map *map)
1447
{
1448
        uint64_t i;
1449
        struct peerd *peer = pr->peer;
1450
        struct mapper_io *mio = __get_mapper_io(pr);
1451
        struct map_node *mn;
1452
        struct xseg_request *req;
1453
        
1454
        XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1455
        map->flags |= MF_MAP_DELETING;
1456
        req = delete_map(pr, map);
1457
        if (!req)
1458
                return -1;
1459
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1460
        if (req->state & XS_FAILED){
1461
                xseg_put_request(peer->xseg, req, pr->portno);
1462
                map->flags &= ~MF_MAP_DELETING;
1463
                return -1;
1464
        }
1465
        xseg_put_request(peer->xseg, req, pr->portno);
1466
        //FIXME
1467
        uint64_t nr_obj = calc_map_obj(map);
1468
        uint64_t deleted = 0;
1469
        while (deleted < nr_obj){ 
1470
                deleted = 0;
1471
                for (i = 0; i < nr_obj; i++){
1472
                        mn = get_mapnode(map, i);
1473
                        if (mn) {
1474
                                if (!(mn->flags & MF_OBJECT_DESTROYED)){
1475
                                        if (mn->flags & MF_OBJECT_EXIST){
1476
                                                //make sure all pending operations on all objects are completed
1477
                                                if (mn->flags & MF_OBJECT_NOT_READY){
1478
                                                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1479
                                                }
1480
                                                req = delete_object(pr, mn);
1481
                                                if (!req)
1482
                                                        if (mio->del_pending){
1483
                                                                goto wait_pending;
1484
                                                        } else {
1485
                                                                continue;
1486
                                                        }
1487
                                                else {
1488
                                                        mio->del_pending++;
1489
                                                }
1490
                                        }
1491
                                        mn->flags &= MF_OBJECT_DESTROYED;
1492
                                }
1493
                                put_mapnode(mn);
1494
                        }
1495
                        deleted++;
1496
                }
1497
wait_pending:
1498
                mio->cb = deletion_cb;
1499
                wait_on_pr(pr, mio->del_pending > 0);
1500
        }
1501
        mio->cb = NULL;
1502
        map->flags &= ~MF_MAP_DELETING;
1503
        XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1504
        return do_close(pr, map);
1505
}
1506

    
1507
static int do_mapr(struct peer_req *pr, struct map *map)
1508
{
1509
        struct peerd *peer = pr->peer;
1510
        int r = req2objs(pr, map, 0);
1511
        if  (r < 0){
1512
                XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1513
                                map->volume, 
1514
                                (unsigned long long) pr->req->offset, 
1515
                                (unsigned long long) (pr->req->offset + pr->req->size));
1516
                return -1;
1517
        }
1518
        XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1519
                        map->volume, 
1520
                        (unsigned long long) pr->req->offset, 
1521
                        (unsigned long long) (pr->req->offset + pr->req->size));
1522
        XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1523
                        (unsigned long long) pr->req->offset,
1524
                        (unsigned long long) pr->req->size);
1525
        char buf[XSEG_MAX_TARGETLEN+1];
1526
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1527
        int i;
1528
        for (i = 0; i < reply->cnt; i++) {
1529
                XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1530
                strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1531
                buf[reply->segs[i].targetlen] = 0;
1532
                XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1533
                                (unsigned long long) reply->segs[i].offset,
1534
                                (unsigned long long) reply->segs[i].size);
1535
        }
1536
        return 0;
1537
}
1538

    
1539
static int do_mapw(struct peer_req *pr, struct map *map)
1540
{
1541
        struct peerd *peer = pr->peer;
1542
        int r = req2objs(pr, map, 1);
1543
        if  (r < 0){
1544
                XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1545
                                map->volume, 
1546
                                (unsigned long long) pr->req->offset, 
1547
                                (unsigned long long) (pr->req->offset + pr->req->size));
1548
                return -1;
1549
        }
1550
        XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1551
                        map->volume, 
1552
                        (unsigned long long) pr->req->offset, 
1553
                        (unsigned long long) (pr->req->offset + pr->req->size));
1554
        XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1555
                        (unsigned long long) pr->req->offset,
1556
                        (unsigned long long) pr->req->size);
1557
        char buf[XSEG_MAX_TARGETLEN+1];
1558
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1559
        int i;
1560
        for (i = 0; i < reply->cnt; i++) {
1561
                XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1562
                strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1563
                buf[reply->segs[i].targetlen] = 0;
1564
                XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1565
                                (unsigned long long) reply->segs[i].offset,
1566
                                (unsigned long long) reply->segs[i].size);
1567
        }
1568
        return 0;
1569
}
1570

    
1571
//here map is the parent map
1572
static int do_clone(struct peer_req *pr, struct map *map)
1573
{
1574
        /*
1575
        FIXME check if clone map exists
1576
        clonemap = get_map(pr, target, targetlen, MF_LOAD);
1577
        if (clonemap)
1578
                do_dropcache(pr, clonemap); // drop map here, rely on get_map_function to drop
1579
                                        //        cache on non-exclusive opens or declare a NO_CACHE flag ?
1580
                return -1;
1581
        */
1582

    
1583
        int r;
1584
        char buf[XSEG_MAX_TARGETLEN];
1585
        struct peerd *peer = pr->peer;
1586
        struct mapperd *mapper = __get_mapperd(peer);
1587
        char *target = xseg_get_target(peer->xseg, pr->req);
1588
        struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1589
        XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1590
        struct map *clonemap = create_map(mapper, target, pr->req->targetlen);
1591
        if (!clonemap) 
1592
                return -1;
1593

    
1594
        if (xclone->size == -1)
1595
                clonemap->size = map->size;
1596
        else
1597
                clonemap->size = xclone->size;
1598
        if (clonemap->size < map->size){
1599
                target = xseg_get_target(peer->xseg, pr->req);
1600
                strncpy(buf, target, pr->req->targetlen);
1601
                buf[pr->req->targetlen] = 0;
1602
                XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1603
                                "\n\t for requested clone %s",
1604
                                (unsigned long long) xclone->size,
1605
                                (unsigned long long) map->size, buf);
1606
                goto out_err;
1607
        }
1608
        
1609
        //alloc and init map_nodes
1610
        unsigned long c = clonemap->size/block_size + 1;
1611
        struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1612
        if (!map_nodes){
1613
                goto out_err;
1614
        }
1615
        int i;
1616
        for (i = 0; i < clonemap->size/block_size + 1; i++) {
1617
                struct map_node *mn = get_mapnode(map, i);
1618
                if (mn) {
1619
                        strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1620
                        map_nodes[i].objectlen = mn->objectlen;
1621
                        put_mapnode(mn);
1622
                } else {
1623
                        strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
1624
                        map_nodes[i].objectlen = strlen(zero_block);
1625
                }
1626
                map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1627
                map_nodes[i].flags = 0;
1628
                map_nodes[i].objectidx = i;
1629
                map_nodes[i].map = clonemap;
1630
                map_nodes[i].ref = 1;
1631
                map_nodes[i].waiters = 0;
1632
                map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1633
                r = insert_object(clonemap, &map_nodes[i]);
1634
                if (r < 0){
1635
                        XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1636
                        goto out_err;
1637
                }
1638
        }
1639
        r = write_map(pr, clonemap);
1640
        if (r < 0){
1641
                XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1642
                goto out_err;
1643
        }
1644
        return 0;
1645

    
1646
out_err:
1647
        put_map(clonemap);
1648
        return -1;
1649
}
1650

    
1651
static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1652
{
1653
        int r, opened = 0;
1654
retry_open:
1655
        if (flags & MF_EXCLUSIVE){
1656
                r = open_map(pr, map);
1657
                if (r < 0) {
1658
                        if (flags & MF_FORCE){
1659

    
1660
                                goto retry_open;
1661
                        }
1662
                } else {
1663
                        opened = 1;
1664
                }
1665
        }
1666
        r = load_map(pr, map);
1667
        if (r < 0 && opened){
1668
                close_map(pr, map);
1669
        }
1670
        return r;
1671
}
1672

    
1673
struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1674
{
1675
        int r;
1676
        struct peerd *peer = pr->peer;
1677
        struct mapperd *mapper = __get_mapperd(peer);
1678
        struct map *map = find_map(mapper, name, namelen);
1679
        if (!map){
1680
                if (flags & MF_LOAD){
1681
                        map = create_map(mapper, name, namelen);
1682
                        if (!map)
1683
                                return NULL;
1684
                        r = open_load_map(pr, map, flags);
1685
                        if (r < 0){
1686
                                do_dropcache(pr, map);
1687
                                return NULL;
1688
                        }
1689
                } else {
1690
                        return NULL;
1691
                }
1692
        } else if (map->flags & MF_MAP_DESTROYED){
1693
                return NULL;
1694
        }
1695
        __get_map(map);
1696
        return map;
1697

    
1698
}
1699

    
1700
static int map_action(int (action)(struct peer_req *pr, struct map *map),
1701
                struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1702
{
1703
        //struct peerd *peer = pr->peer;
1704
        struct map *map;
1705
start:
1706
        map = get_map(pr, name, namelen, flags);
1707
        if (!map)
1708
                return -1;
1709
        if (map->flags & MF_MAP_NOT_READY){
1710
                wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1711
                put_map(map);
1712
                goto start;
1713
        }
1714
        int r = action(pr, map);
1715
        //always drop cache if map not read exclusively
1716
        if (!(map->flags & MF_MAP_EXCLUSIVE))
1717
                do_dropcache(pr, map);
1718
        //maybe capture ref before and compare here?
1719
        if (map->ref > 1){
1720
                signal_map(map);
1721
        }
1722
        put_map(map);
1723
        return r;
1724
}
1725

    
1726
void * handle_info(struct peer_req *pr)
1727
{
1728
        struct peerd *peer = pr->peer;
1729
        char *target = xseg_get_target(peer->xseg, pr->req);
1730
        int r = map_action(do_info, pr, target, pr->req->targetlen, MF_LOAD);
1731
        if (r < 0)
1732
                fail(peer, pr);
1733
        else
1734
                complete(peer, pr);
1735
        ta--;
1736
        return NULL;
1737
}
1738

    
1739
void * handle_clone(struct peer_req *pr)
1740
{
1741
        int r;
1742
        struct peerd *peer = pr->peer;
1743
        struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1744
        if (!xclone) {
1745
                r = -1;
1746
                goto out;
1747
        }
1748
        if (xclone->targetlen){
1749
                r = map_action(do_clone, pr, xclone->target, xclone->targetlen, MF_LOAD);
1750
        } else {
1751
                if (!xclone->size){
1752
                        r = -1;
1753
                } else {
1754
                        struct map *map;
1755
                        char *target = xseg_get_target(peer->xseg, pr->req);
1756
                        XSEGLOG2(&lc, I, "Creating volume");
1757
                        map = get_map(pr, target, pr->req->targetlen, MF_LOAD);
1758
                        if (map){
1759
                                XSEGLOG2(&lc, E, "Volume %s exists", map->volume);
1760
                                if (map->ref <= 2) //initial one + one ref from __get_map
1761
                                        do_dropcache(pr, map); //we are the only ones usining this map. Drop the cache. 
1762
                                put_map(map); //matches get_map
1763
                                r = -1;
1764
                                goto out;
1765
                        }
1766
                        //create a new empty map of size
1767
                        map = create_map(mapper, target, pr->req->targetlen);
1768
                        if (!map){
1769
                                r = -1;
1770
                                goto out;
1771
                        }
1772
                        map->size = xclone->size;
1773
                        //populate_map with zero objects;
1774
                        uint64_t nr_objs = xclone->size / block_size;
1775
                        if (xclone->size % block_size)
1776
                                nr_objs++;
1777
                                
1778
                        struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1779
                        if (!map_nodes){
1780
                                do_dropcache(pr, map); //Since we just created the map, dropping cache should be sufficient.
1781
                                r = -1;
1782
                                goto out;
1783
                        }
1784
                        uint64_t i;
1785
                        for (i = 0; i < nr_objs; i++) {
1786
                                strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
1787
                                map_nodes[i].objectlen = strlen(zero_block);
1788
                                map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1789
                                map_nodes[i].flags = 0;
1790
                                map_nodes[i].objectidx = i;
1791
                                map_nodes[i].map = map;
1792
                                map_nodes[i].ref = 1;
1793
                                map_nodes[i].waiters = 0;
1794
                                map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1795
                                r = insert_object(map, &map_nodes[i]);
1796
                                if (r < 0){
1797
                                        do_dropcache(pr, map);
1798
                                        r = -1;
1799
                                        goto out;
1800
                                }
1801
                        }
1802
                        r = write_map(pr, map);
1803
                        if (r < 0){
1804
                                XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1805
                                do_dropcache(pr, map);
1806
                                goto out;
1807
                        }
1808
                        XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1809
                        r = 0;
1810
                        do_dropcache(pr, map); //drop cache here for consistency
1811
                }
1812
        }
1813
out:
1814
        if (r < 0)
1815
                fail(peer, pr);
1816
        else
1817
                complete(peer, pr);
1818
        ta--;
1819
        return NULL;
1820
}
1821

    
1822
void * handle_mapr(struct peer_req *pr)
1823
{
1824
        struct peerd *peer = pr->peer;
1825
        char *target = xseg_get_target(peer->xseg, pr->req);
1826
        int r = map_action(do_mapr, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE);
1827
        if (r < 0)
1828
                fail(peer, pr);
1829
        else
1830
                complete(peer, pr);
1831
        ta--;
1832
        return NULL;
1833
}
1834

    
1835
void * handle_mapw(struct peer_req *pr)
1836
{
1837
        struct peerd *peer = pr->peer;
1838
        char *target = xseg_get_target(peer->xseg, pr->req);
1839
        int r = map_action(do_mapw, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1840
        if (r < 0)
1841
                fail(peer, pr);
1842
        else
1843
                complete(peer, pr);
1844
        XSEGLOG2(&lc, D, "Ta: %d", ta);
1845
        ta--;
1846
        return NULL;
1847
}
1848

    
1849
void * handle_destroy(struct peer_req *pr)
1850
{
1851
        struct peerd *peer = pr->peer;
1852
        char *target = xseg_get_target(peer->xseg, pr->req);
1853
        int r = map_action(do_destroy, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1854
        if (r < 0)
1855
                fail(peer, pr);
1856
        else
1857
                complete(peer, pr);
1858
        ta--;
1859
        return NULL;
1860
}
1861

    
1862
void * handle_close(struct peer_req *pr)
1863
{
1864
        struct peerd *peer = pr->peer;
1865
        char *target = xseg_get_target(peer->xseg, pr->req);
1866
        //here we do not want to load
1867
        int r = map_action(do_close, pr, target, pr->req->targetlen, MF_EXCLUSIVE|MF_FORCE);
1868
        if (r < 0)
1869
                fail(peer, pr);
1870
        else
1871
                complete(peer, pr);
1872
        ta--;
1873
        return NULL;
1874
}
1875

    
1876
int dispatch_accepted(struct peerd *peer, struct peer_req *pr, 
1877
                        struct xseg_request *req)
1878
{
1879
        //struct mapperd *mapper = __get_mapperd(peer);
1880
        struct mapper_io *mio = __get_mapper_io(pr);
1881
        void *(*action)(struct peer_req *) = NULL;
1882

    
1883
        mio->state = ACCEPTED;
1884
        mio->err = 0;
1885
        mio->cb = NULL;
1886
        switch (pr->req->op) {
1887
                /* primary xseg operations of mapper */
1888
                case X_CLONE: action = handle_clone; break;
1889
                case X_MAPR: action = handle_mapr; break;
1890
                case X_MAPW: action = handle_mapw; break;
1891
//                case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1892
                case X_INFO: action = handle_info; break;
1893
                case X_DELETE: action = handle_destroy; break;
1894
                case X_CLOSE: action = handle_close; break;
1895
                default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1896
        }
1897
        if (action){
1898
                ta++;
1899
                mio->active = 1;
1900
                st_thread_create(action, pr, 0, 0);
1901
        }
1902
        return 0;
1903

    
1904
}
1905

    
1906
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1907
                enum dispatch_reason reason)
1908
{
1909
        struct mapperd *mapper = __get_mapperd(peer);
1910
        (void) mapper;
1911
        struct mapper_io *mio = __get_mapper_io(pr);
1912
        (void) mio;
1913

    
1914

    
1915
        if (reason == dispatch_accept)
1916
                dispatch_accepted(peer, pr, req);
1917
        else {
1918
                if (mio->cb){
1919
                        mio->cb(pr, req);
1920
                } else { 
1921
                        signal_pr(pr);
1922
                }
1923
        }
1924
        return 0;
1925
}
1926

    
1927
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1928
{
1929
        int i;
1930
        unsigned char buf[SHA256_DIGEST_SIZE];
1931
        unsigned char *zero;
1932

    
1933
        gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1934

    
1935
               /* Version check should be the very first call because it
1936
          makes sure that important subsystems are intialized. */
1937
               gcry_check_version (NULL);
1938
     
1939
               /* Disable secure memory.  */
1940
               gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1941
     
1942
               /* Tell Libgcrypt that initialization has completed. */
1943
               gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1944

    
1945
        /* calculate out magic sha hash value */
1946
        gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1947

    
1948
        /* calculate zero block */
1949
        //FIXME check hash value
1950
        zero = malloc(block_size);
1951
        memset(zero, 0, block_size);
1952
        gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1953
        for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1954
                sprintf(zero_block + 2*i, "%02x", buf[i]);
1955
        printf("%s \n", zero_block);
1956
        free(zero);
1957

    
1958
        //FIXME error checks
1959
        struct mapperd *mapperd = malloc(sizeof(struct mapperd));
1960
        peer->priv = mapperd;
1961
        mapper = mapperd;
1962
        mapper->hashmaps = xhash_new(3, STRING);
1963
        
1964
        for (i = 0; i < peer->nr_ops; i++) {
1965
                struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1966
                mio->copyups_nodes = xhash_new(3, INTEGER);
1967
                mio->copyups = 0;
1968
                mio->err = 0;
1969
                mio->active = 0;
1970
                peer->peer_reqs[i].priv = mio;
1971
        }
1972

    
1973
        for (i = 0; i < argc; i++) {
1974
                if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1975
                        mapper->bportno = atoi(argv[i+1]);
1976
                        i += 1;
1977
                        continue;
1978
                }
1979
                if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1980
                        mapper->mbportno = atoi(argv[i+1]);
1981
                        i += 1;
1982
                        continue;
1983
                }
1984
                /* enforce only one thread */
1985
                if (!strcmp(argv[i], "-t") && (i+1) < argc){
1986
                        int t = atoi(argv[i+1]);
1987
                        if (t != 1) {
1988
                                printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1989
                                return -1;
1990
                        }
1991
                        i += 1;
1992
                        continue;
1993
                }
1994
        }
1995

    
1996
        const struct sched_param param = { .sched_priority = 99 };
1997
        sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
1998

    
1999

    
2000
//        test_map(peer);
2001

    
2002
        return 0;
2003
}
2004

    
2005
void print_obj(struct map_node *mn)
2006
{
2007
        fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2008
                        (unsigned long long) mn->objectidx, mn->object, 
2009
                        (unsigned int) mn->objectlen, 
2010
                        (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2011
}
2012

    
2013
void print_map(struct map *m)
2014
{
2015
        uint64_t nr_objs = m->size/block_size;
2016
        if (m->size % block_size)
2017
                nr_objs++;
2018
        fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n", 
2019
                        m->volume, m->volumelen, 
2020
                        (unsigned long long) m->size, 
2021
                        (unsigned long long) nr_objs);
2022
        uint64_t i;
2023
        struct map_node *mn;
2024
        if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2025
                return;
2026
        for (i = 0; i < nr_objs; i++) {
2027
                mn = find_object(m, i);
2028
                if (!mn){
2029
                        printf("object idx [%llu] not found!\n", (unsigned long long) i);
2030
                        continue;
2031
                }
2032
                print_obj(mn);
2033
        }
2034
}
2035

    
2036
/*
2037
void test_map(struct peerd *peer)
2038
{
2039
        int i,j, ret;
2040
        //struct sha256_ctx sha256ctx;
2041
        unsigned char buf[SHA256_DIGEST_SIZE];
2042
        char buf_new[XSEG_MAX_TARGETLEN + 20];
2043
        struct map *m = malloc(sizeof(struct map));
2044
        strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2045
        m->volume[XSEG_MAX_TARGETLEN] = 0;
2046
        strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2047
        buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2048
        m->volumelen = XSEG_MAX_TARGETLEN;
2049
        m->size = 100*block_size;
2050
        m->objects = xhash_new(3, INTEGER);
2051
        struct map_node *map_node = calloc(100, sizeof(struct map_node));
2052
        for (i = 0; i < 100; i++) {
2053
                sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2054
                gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2055
                
2056
                for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2057
                        sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2058
                }
2059
                map_node[i].objectidx = i;
2060
                map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2061
                map_node[i].flags = MF_OBJECT_EXIST;
2062
                ret = insert_object(m, &map_node[i]);
2063
        }
2064

2065
        char *data = malloc(block_size);
2066
        mapheader_to_map(m, data);
2067
        uint64_t pos = mapheader_size;
2068

2069
        for (i = 0; i < 100; i++) {
2070
                map_node = find_object(m, i);
2071
                if (!map_node){
2072
                        printf("no object node %d \n", i);
2073
                        exit(1);
2074
                }
2075
                object_to_map(data+pos, map_node);
2076
                pos += objectsize_in_map;
2077
        }
2078
//        print_map(m);
2079

2080
        struct map *m2 = malloc(sizeof(struct map));
2081
        strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2082
        m->volume[XSEG_MAX_TARGETLEN] = 0;
2083
        m->volumelen = XSEG_MAX_TARGETLEN;
2084

2085
        m2->objects = xhash_new(3, INTEGER);
2086
        ret = read_map(peer, m2, data);
2087
//        print_map(m2);
2088

2089
        int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2090
        ssize_t r, sum = 0;
2091
        while (sum < block_size) {
2092
                r = write(fd, data + sum, block_size -sum);
2093
                if (r < 0){
2094
                        perror("write");
2095
                        printf("write error\n");
2096
                        exit(1);
2097
                } 
2098
                sum += r;
2099
        }
2100
        close(fd);
2101
        map_node = find_object(m, 0);
2102
        free(map_node);
2103
        free(m);
2104
}
2105
*/