Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / mt-mapperd.c @ 8de1e033

History | View | Annotate | Download (58 kB)

1
#include <stdio.h>
2
#include <unistd.h>
3
#include <sys/types.h>
4
#include <pthread.h>
5
#include <xseg/xseg.h>
6
#include <peer.h>
7
#include <time.h>
8
#include <xtypes/xlock.h>
9
#include <xtypes/xhash.h>
10
#include <xseg/protocol.h>
11
#include <sys/stat.h>
12
#include <fcntl.h>
13
#include <errno.h>
14
#include <sched.h>
15
#include <sys/syscall.h>
16
#include <openssl/sha.h>
17

    
18
/* general mapper flags */
19
#define MF_LOAD         (1 << 0)
20
#define MF_EXCLUSIVE         (1 << 1)
21
#define MF_FORCE         (1 << 2)
22
#define MF_ARCHIP        (1 << 3)
23

    
24
#ifndef SHA256_DIGEST_SIZE
25
#define SHA256_DIGEST_SIZE 32
26
#endif
27
/* hex representation of sha256 value takes up double the sha256 size */
28
#define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
29

    
30
#define block_size (1<<22) //FIXME this should be defined here?
31

    
32
/* transparency byte + max object len in disk */
33
#define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
34

    
35
/* Map header contains:
36
 *         map version
37
 *         volume size
38
 */
39
#define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
40

    
41

    
42
#define MAPPER_PREFIX "archip_"
43
#define MAPPER_PREFIX_LEN 7
44

    
45
#define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
46
#define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
47

    
48
#if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
49
#error         "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
50
#endif
51

    
52
#define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
53

    
54
#if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
55
#error         "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
56
#endif
57

    
58
#define MAX_VOLUME_SIZE \
59
((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
60

    
61

    
62
char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
63
#define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
64

    
65
/* dispatch_internal mapper states */
66
enum mapper_state {
67
        ACCEPTED = 0,
68
        WRITING = 1,
69
        COPYING = 2,
70
        DELETING = 3,
71
        DROPPING_CACHE = 4
72
};
73

    
74
typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
75

    
76

    
77
/* mapper object flags */
78
#define MF_OBJECT_EXIST                (1 << 0)
79
#define MF_OBJECT_COPYING        (1 << 1)
80
#define MF_OBJECT_WRITING        (1 << 2)
81
#define MF_OBJECT_DELETING        (1 << 3)
82
#define MF_OBJECT_DELETED        (1 << 4)
83
#define MF_OBJECT_DESTROYED        (1 << 5)
84

    
85
#define MF_OBJECT_NOT_READY        (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
86
                                        MF_OBJECT_DELETING)
87
struct map_node {
88
        uint32_t flags;
89
        uint32_t objectidx;
90
        uint32_t objectlen;
91
        char object[MAX_OBJECT_LEN + 1];         /* NULL terminated string */
92
        struct map *map;
93
        uint32_t ref;
94
        uint32_t waiters;
95
        st_cond_t cond;
96
};
97

    
98

    
99
#define wait_on_pr(__pr, __condition__)         \
100
        while (__condition__){                        \
101
                ta--;                                \
102
                __get_mapper_io(pr)->active = 0;\
103
                XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
104
                st_cond_wait(__pr->cond);        \
105
        }
106

    
107
#define wait_on_mapnode(__mn, __condition__)        \
108
        while (__condition__){                        \
109
                ta--;                                \
110
                __mn->waiters++;                \
111
                XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
112
                        ta: %u",  __mn, __mn->object, __mn->waiters, ta);  \
113
                st_cond_wait(__mn->cond);        \
114
        }
115

    
116
#define wait_on_map(__map, __condition__)        \
117
        while (__condition__){                        \
118
                ta--;                                \
119
                __map->waiters++;                \
120
                XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
121
                                   __map, __map->volume, __map->waiters, ta); \
122
                st_cond_wait(__map->cond);        \
123
        }
124

    
125
#define signal_pr(__pr)                                \
126
        do {                                         \
127
                if (!__get_mapper_io(pr)->active){\
128
                        ta++;                        \
129
                        XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta);\
130
                        __get_mapper_io(pr)->active = 1;\
131
                        st_cond_signal(__pr->cond);        \
132
                }                                \
133
        }while(0)
134

    
135
#define signal_map(__map)                        \
136
        do {                                         \
137
                if (__map->waiters) {                \
138
                        ta += 1;                \
139
                        XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
140
                        ta: %u",  __map, __map->volume, __map->waiters, ta); \
141
                        __map->waiters--;        \
142
                        st_cond_signal(__map->cond);        \
143
                }                                \
144
        }while(0)
145

    
146
#define signal_mapnode(__mn)                        \
147
        do {                                         \
148
                if (__mn->waiters) {                \
149
                        ta += __mn->waiters;        \
150
                        XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
151
                        %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
152
                        __mn->waiters = 0;        \
153
                        st_cond_broadcast(__mn->cond);        \
154
                }                                \
155
        }while(0)
156

    
157

    
158
/* map flags */
159
#define MF_MAP_LOADING                (1 << 0)
160
#define MF_MAP_DESTROYED        (1 << 1)
161
#define MF_MAP_WRITING                (1 << 2)
162
#define MF_MAP_DELETING                (1 << 3)
163
#define MF_MAP_DROPPING_CACHE        (1 << 4)
164
#define MF_MAP_EXCLUSIVE        (1 << 5)
165
#define MF_MAP_OPENING                (1 << 6)
166
#define MF_MAP_CLOSING                (1 << 7)
167

    
168
#define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
169
                                        MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
170

    
171
struct map {
172
        uint32_t version;
173
        uint32_t flags;
174
        uint64_t size;
175
        uint32_t volumelen;
176
        char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
177
        xhash_t *objects;         /* obj_index --> map_node */
178
        uint32_t ref;
179
        uint32_t waiters;
180
        st_cond_t cond;
181
};
182

    
183
struct mapperd {
184
        xport bportno;                /* blocker that accesses data */
185
        xport mbportno;                /* blocker that accesses maps */
186
        xhash_t *hashmaps; // hash_function(target) --> struct map
187
};
188

    
189
struct mapper_io {
190
        volatile uint32_t copyups;        /* nr of copyups pending, issued by this mapper io */
191
        xhash_t *copyups_nodes;                /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
192
        struct map_node *copyup_node;
193
        volatile int err;                        /* error flag */
194
        volatile uint64_t del_pending;
195
        uint64_t delobj;
196
        uint64_t dcobj;
197
        cb_t cb;
198
        enum mapper_state state;
199
        volatile int active;
200
};
201

    
202
/* global vars */
203
struct mapperd *mapper;
204

    
205
void print_map(struct map *m);
206

    
207

    
208
void custom_peer_usage()
209
{
210
        fprintf(stderr, "Custom peer options: \n"
211
                        "-bp  : port for block blocker(!)\n"
212
                        "-mbp : port for map blocker\n"
213
                        "\n");
214
}
215

    
216

    
217
/*
218
 * Helper functions
219
 */
220

    
221
static inline struct mapperd * __get_mapperd(struct peerd *peer)
222
{
223
        return (struct mapperd *) peer->priv;
224
}
225

    
226
static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
227
{
228
        return (struct mapper_io *) pr->priv;
229
}
230

    
231
static inline uint64_t calc_map_obj(struct map *map)
232
{
233
        if (map->size == -1)
234
                return 0;
235
        uint64_t nr_objs = map->size / block_size;
236
        if (map->size % block_size)
237
                nr_objs++;
238
        return nr_objs;
239
}
240

    
241
static uint32_t calc_nr_obj(struct xseg_request *req)
242
{
243
        unsigned int r = 1;
244
        uint64_t rem_size = req->size;
245
        uint64_t obj_offset = req->offset & (block_size -1); //modulo
246
        uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
247
        rem_size -= obj_size;
248
        while (rem_size > 0) {
249
                obj_size = (rem_size > block_size) ? block_size : rem_size;
250
                rem_size -= obj_size;
251
                r++;
252
        }
253

    
254
        return r;
255
}
256

    
257
/* hexlify function. 
258
 * Unsafe. Doesn't check if data length is odd!
259
 */
260
static void hexlify(unsigned char *data, char *hex)
261
{
262
        int i;
263
        for (i=0; i<SHA256_DIGEST_LENGTH; i++)
264
                sprintf(hex+2*i, "%02x", data[i]);
265
}
266

    
267
static void unhexlify(char *hex, unsigned char *data)
268
{
269
        int i;
270
        char c;
271
        for (i=0; i<SHA256_DIGEST_LENGTH; i++){
272
                data[i] = 0;
273
                c = hex[2*i];
274
                if (isxdigit(c)){
275
                        if (isdigit(c)){
276
                                c-= '0';
277
                        }
278
                        else {
279
                                c = tolower(c);
280
                                c = c-'a' + 10;
281
                        }
282
                }
283
                else {
284
                        c = 0;
285
                }
286
                data[i] |= (c << 4) & 0xF0;
287
                c = hex[2*i+1];
288
                if (isxdigit(c)){
289
                        if (isdigit(c)){
290
                                c-= '0';
291
                        }
292
                        else {
293
                                c = tolower(c);
294
                                c = c-'a' + 10;
295
                        }
296
                }
297
                else {
298
                        c = 0;
299
                }
300
                data[i] |= c & 0x0F;
301
        }
302
}
303
/*
304
 * Maps handling functions
305
 */
306

    
307
static struct map * find_map(struct mapperd *mapper, char *volume)
308
{
309
        struct map *m = NULL;
310
        int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
311
                                (xhashidx *) &m);
312
        if (r < 0)
313
                return NULL;
314
        return m;
315
}
316

    
317
static struct map * find_map_len(struct mapperd *mapper, char *target,
318
                                        uint32_t targetlen, uint32_t flags)
319
{
320
        char buf[XSEG_MAX_TARGETLEN+1];
321
        if (flags & MF_ARCHIP){
322
                strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
323
                strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
324
                buf[MAPPER_PREFIX_LEN + targetlen] = 0;
325
                targetlen += MAPPER_PREFIX_LEN;
326
        }
327
        else {
328
                strncpy(buf, target, targetlen);
329
                buf[targetlen] = 0;
330
        }
331

    
332
        if (targetlen > MAX_VOLUME_LEN){
333
                XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
334
                                        targetlen, MAX_VOLUME_LEN);
335
                return NULL;
336
        }
337

    
338
        XSEGLOG2(&lc, D, "looking up map %s, len %u",
339
                        buf, targetlen);
340
        return find_map(mapper, buf);
341
}
342

    
343

    
344
static int insert_map(struct mapperd *mapper, struct map *map)
345
{
346
        int r = -1;
347

    
348
        if (find_map(mapper, map->volume)){
349
                XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
350
                goto out;
351
        }
352

    
353
        XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
354
                        map->volume, strlen(map->volume), (unsigned long) map);
355
        r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
356
        while (r == -XHASH_ERESIZE) {
357
                xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
358
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
359
                if (!new_hashmap){
360
                        XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
361
                                        (unsigned long long) shift);
362
                        goto out;
363
                }
364
                mapper->hashmaps = new_hashmap;
365
                r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
366
        }
367
out:
368
        return r;
369
}
370

    
371
static int remove_map(struct mapperd *mapper, struct map *map)
372
{
373
        int r = -1;
374

    
375
        //assert no pending pr on map
376

    
377
        r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
378
        while (r == -XHASH_ERESIZE) {
379
                xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
380
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
381
                if (!new_hashmap){
382
                        XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
383
                                        (unsigned long long) shift);
384
                        goto out;
385
                }
386
                mapper->hashmaps = new_hashmap;
387
                r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
388
        }
389
out:
390
        return r;
391
}
392

    
393
static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
394
{
395
        int r;
396
        xport p;
397
        struct peerd *peer = pr->peer;
398
        struct xseg_request *req;
399
        struct mapperd *mapper = __get_mapperd(peer);
400
        void *dummy;
401

    
402
        XSEGLOG2(&lc, I, "Closing map %s", map->volume);
403

    
404
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
405
        if (!req){
406
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
407
                                map->volume);
408
                goto out_err;
409
        }
410

    
411
        r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
412
        if (r < 0){
413
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
414
                                map->volume);
415
                goto out_put;
416
        }
417

    
418
        char *reqtarget = xseg_get_target(peer->xseg, req);
419
        if (!reqtarget)
420
                goto out_put;
421
        strncpy(reqtarget, map->volume, req->targetlen);
422
        req->op = X_CLOSE;
423
        req->size = 0;
424
        req->offset = 0;
425
        r = xseg_set_req_data(peer->xseg, req, pr);
426
        if (r < 0){
427
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
428
                                map->volume);
429
                goto out_put;
430
        }
431
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
432
        if (p == NoPort){
433
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
434
                                map->volume);
435
                goto out_unset;
436
        }
437
        r = xseg_signal(peer->xseg, p);
438

    
439
        XSEGLOG2(&lc, I, "Map %s closing", map->volume);
440
        return req;
441

    
442
out_unset:
443
        xseg_get_req_data(peer->xseg, req, &dummy);
444
out_put:
445
        xseg_put_request(peer->xseg, req, pr->portno);
446
out_err:
447
        return NULL;
448
}
449

    
450
static int close_map(struct peer_req *pr, struct map *map)
451
{
452
        int err;
453
        struct xseg_request *req;
454
        struct peerd *peer = pr->peer;
455

    
456
        map->flags |= MF_MAP_CLOSING;
457
        req = __close_map(pr, map);
458
        if (!req)
459
                return -1;
460
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
461
        map->flags &= ~MF_MAP_CLOSING;
462
        err = req->state & XS_FAILED;
463
        xseg_put_request(peer->xseg, req, pr->portno);
464
        if (err)
465
                return -1;
466
        return 0;
467
}
468

    
469
/*
470
static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
471
                                char *target, uint32_t targetlen, struct map **m)
472
{
473
        struct mapperd *mapper = __get_mapperd(peer);
474
        int r;
475
        *m = find_map(mapper, target, targetlen);
476
        if (*m) {
477
                XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
478
                if ((*m)->flags & MF_MAP_NOT_READY) {
479
                        __xq_append_tail(&(*m)->pending, (xqindex) pr);
480
                        XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
481
                        return MF_PENDING;
482
                //} else if ((*m)->flags & MF_MAP_DESTROYED){
483
                //        return -1;
484
                // 
485
                }else {
486
                        XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
487
                        return 0;
488
                }
489
        }
490
        r = open_map(peer, pr, target, targetlen, 0);
491
        if (r < 0)
492
                return -1; //error
493
        return MF_PENDING;        
494
}
495
*/
496
/*
497
 * Object handling functions
498
 */
499

    
500
struct map_node *find_object(struct map *map, uint64_t obj_index)
501
{
502
        struct map_node *mn;
503
        int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
504
        if (r < 0)
505
                return NULL;
506
        return mn;
507
}
508

    
509
static int insert_object(struct map *map, struct map_node *mn)
510
{
511
        //FIXME no find object first
512
        int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
513
        if (r == -XHASH_ERESIZE) {
514
                unsigned long shift = xhash_grow_size_shift(map->objects);
515
                map->objects = xhash_resize(map->objects, shift, NULL);
516
                if (!map->objects)
517
                        return -1;
518
                r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
519
        }
520
        return r;
521
}
522

    
523

    
524
/*
525
 * map read/write functions 
526
 */
527
static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
528
{
529
        hexlify(buf, mn->object);
530
        mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
531
        mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
532
        mn->flags = MF_OBJECT_EXIST;
533
}
534

    
535
static inline void map_to_object(struct map_node *mn, unsigned char *buf)
536
{
537
        char c = buf[0];
538
        mn->flags = 0;
539
        if (c){
540
                mn->flags |= MF_OBJECT_EXIST;
541
                strcpy(mn->object, MAPPER_PREFIX);
542
                hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
543
                mn->object[MAX_OBJECT_LEN] = 0;
544
                mn->objectlen = strlen(mn->object);
545
        }
546
        else {
547
                hexlify(buf+1, mn->object);
548
                mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
549
                mn->objectlen = strlen(mn->object);
550
        }
551

    
552
}
553

    
554
static inline void object_to_map(char* buf, struct map_node *mn)
555
{
556
        buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
557
        if (buf[0]){
558
                /* strip common prefix */
559
                unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
560
        }
561
        else {
562
                unhexlify(mn->object, (unsigned char *)(buf+1));
563
        }
564
}
565

    
566
static inline void mapheader_to_map(struct map *m, char *buf)
567
{
568
        uint64_t pos = 0;
569
        memcpy(buf + pos, &m->version, sizeof(m->version));
570
        pos += sizeof(m->version);
571
        memcpy(buf + pos, &m->size, sizeof(m->size));
572
        pos += sizeof(m->size);
573
}
574

    
575

    
576
static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
577
                                struct map *map, struct map_node *mn)
578
{
579
        void *dummy;
580
        struct mapperd *mapper = __get_mapperd(peer);
581
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
582
                                                        mapper->mbportno, X_ALLOC);
583
        if (!req){
584
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
585
                                "(Map: %s [%llu]",
586
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
587
                goto out_err;
588
        }
589
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
590
        if (r < 0){
591
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
592
                                "(Map: %s [%llu]",
593
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
594
                goto out_put;
595
        }
596
        char *target = xseg_get_target(peer->xseg, req);
597
        strncpy(target, map->volume, req->targetlen);
598
        req->size = objectsize_in_map;
599
        req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
600
        req->op = X_WRITE;
601
        char *data = xseg_get_data(peer->xseg, req);
602
        object_to_map(data, mn);
603

    
604
        r = xseg_set_req_data(peer->xseg, req, pr);
605
        if (r < 0){
606
                XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
607
                                "(Map: %s [%llu]",
608
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
609
                goto out_put;
610
        }
611
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
612
        if (p == NoPort){
613
                XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
614
                                "(Map: %s [%llu]",
615
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
616
                goto out_unset;
617
        }
618
        r = xseg_signal(peer->xseg, p);
619
        if (r < 0)
620
                XSEGLOG2(&lc, W, "Cannot signal port %u", p);
621

    
622
        XSEGLOG2(&lc, I, "Writing object %s \n\t"
623
                        "Map: %s [%llu]",
624
                        mn->object, map->volume, (unsigned long long) mn->objectidx);
625

    
626
        return req;
627

    
628
out_unset:
629
        xseg_get_req_data(peer->xseg, req, &dummy);
630
out_put:
631
        xseg_put_request(peer->xseg, req, pr->portno);
632
out_err:
633
        XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
634
                        "(Map: %s [%llu]",
635
                        mn->object, map->volume, (unsigned long long) mn->objectidx);
636
        return NULL;
637
}
638

    
639
static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
640
{
641
        void *dummy;
642
        struct peerd *peer = pr->peer;
643
        struct mapperd *mapper = __get_mapperd(peer);
644
        struct map_node *mn;
645
        uint64_t i, pos, max_objidx = calc_map_obj(map);
646
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
647
                                                        mapper->mbportno, X_ALLOC);
648
        if (!req){
649
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
650
                goto out_err;
651
        }
652
        int r = xseg_prep_request(peer->xseg, req, map->volumelen,
653
                        mapheader_size + max_objidx * objectsize_in_map);
654
        if (r < 0){
655
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
656
                goto out_put;
657
        }
658
        char *target = xseg_get_target(peer->xseg, req);
659
        strncpy(target, map->volume, req->targetlen);
660
        char *data = xseg_get_data(peer->xseg, req);
661
        mapheader_to_map(map, data);
662
        pos = mapheader_size;
663
        req->op = X_WRITE;
664
        req->size = req->datalen;
665
        req->offset = 0;
666

    
667
        if (map->size % block_size)
668
                max_objidx++;
669
        for (i = 0; i < max_objidx; i++) {
670
                mn = find_object(map, i);
671
                if (!mn){
672
                        XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
673
                                        (unsigned long long) i, map->volume);
674
                        goto out_put;
675
                }
676
                object_to_map(data+pos, mn);
677
                pos += objectsize_in_map;
678
        }
679
        r = xseg_set_req_data(peer->xseg, req, pr);
680
        if (r < 0){
681
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
682
                                map->volume);
683
                goto out_put;
684
        }
685
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
686
        if (p == NoPort){
687
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
688
                                map->volume);
689
                goto out_unset;
690
        }
691
        r = xseg_signal(peer->xseg, p);
692
        if (r < 0)
693
                XSEGLOG2(&lc, W, "Cannot signal port %u", p);
694

    
695
        map->flags |= MF_MAP_WRITING;
696
        XSEGLOG2(&lc, I, "Writing map %s", map->volume);
697
        return req;
698

    
699
out_unset:
700
        xseg_get_req_data(peer->xseg, req, &dummy);
701
out_put:
702
        xseg_put_request(peer->xseg, req, pr->portno);
703
out_err:
704
        XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
705
        return NULL;
706
}
707

    
708
static int write_map(struct peer_req* pr, struct map *map)
709
{
710
        int r = 0;
711
        struct peerd *peer = pr->peer;
712
        map->flags |= MF_MAP_WRITING;
713
        struct xseg_request *req = __write_map(pr, map);
714
        if (!req)
715
                return -1;
716
        wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
717
        if (req->state & XS_FAILED)
718
                r = -1;
719
        xseg_put_request(peer->xseg, req, pr->portno);
720
        map->flags &= ~MF_MAP_WRITING;
721
        return r;
722
}
723

    
724
static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
725
{
726
        int r;
727
        xport p;
728
        struct xseg_request *req;
729
        struct peerd *peer = pr->peer;
730
        struct mapperd *mapper = __get_mapperd(peer);
731
        void *dummy;
732

    
733
        XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
734

    
735
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
736
        if (!req){
737
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
738
                                m->volume);
739
                goto out_fail;
740
        }
741

    
742
        r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
743
        if (r < 0){
744
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
745
                                m->volume);
746
                goto out_put;
747
        }
748

    
749
        char *reqtarget = xseg_get_target(peer->xseg, req);
750
        if (!reqtarget)
751
                goto out_put;
752
        strncpy(reqtarget, m->volume, req->targetlen);
753
        req->op = X_READ;
754
        req->size = block_size;
755
        req->offset = 0;
756
        r = xseg_set_req_data(peer->xseg, req, pr);
757
        if (r < 0){
758
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
759
                                m->volume);
760
                goto out_put;
761
        }
762
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
763
        if (p == NoPort){ 
764
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
765
                                m->volume);
766
                goto out_unset;
767
        }
768
        r = xseg_signal(peer->xseg, p);
769
        
770
        XSEGLOG2(&lc, I, "Map %s loading", m->volume);
771
        return req;
772

    
773
out_unset:
774
        xseg_get_req_data(peer->xseg, req, &dummy);
775
out_put:
776
        xseg_put_request(peer->xseg, req, pr->portno);
777
out_fail:
778
        return NULL;
779
}
780

    
781
static int read_map (struct map *map, unsigned char *buf)
782
{
783
        char nulls[SHA256_DIGEST_SIZE];
784
        memset(nulls, 0, SHA256_DIGEST_SIZE);
785

    
786
        int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
787
        if (r) {
788
                XSEGLOG2(&lc, D, "Read zeros");
789
                //read error;
790
                return -1;
791
        }
792
        //type 1, archip type, type 0 pithos map
793
        int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
794
        XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
795
        uint64_t pos;
796
        uint64_t i, nr_objs;
797
        struct map_node *map_node;
798
        if (type) {
799
                uint64_t pos = 0;
800
                map->version = *(uint32_t *) (buf + pos);
801
                pos += sizeof(uint32_t);
802
                map->size = *(uint64_t *) (buf + pos);
803
                pos += sizeof(uint64_t);
804
                nr_objs = map->size / block_size;
805
                if (map->size % block_size)
806
                        nr_objs++;
807
                map_node = calloc(nr_objs, sizeof(struct map_node));
808
                if (!map_node)
809
                        return -1;
810

    
811
                for (i = 0; i < nr_objs; i++) {
812
                        map_node[i].map = map;
813
                        map_node[i].objectidx = i;
814
                        map_node[i].waiters = 0;
815
                        map_node[i].ref = 1;
816
                        map_node[i].cond = st_cond_new(); //FIXME err check;
817
                        map_to_object(&map_node[i], buf + pos);
818
                        pos += objectsize_in_map;
819
                        r = insert_object(map, &map_node[i]); //FIXME error check
820
                }
821
        } else {
822
                pos = 0;
823
                uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
824
                map_node = calloc(max_nr_objs, sizeof(struct map_node));
825
                if (!map_node)
826
                        return -1;
827
                for (i = 0; i < max_nr_objs; i++) {
828
                        if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
829
                                break;
830
                        map_node[i].objectidx = i;
831
                        map_node[i].map = map;
832
                        map_node[i].waiters = 0;
833
                        map_node[i].ref = 1;
834
                        map_node[i].cond = st_cond_new(); //FIXME err check;
835
                        pithosmap_to_object(&map_node[i], buf + pos);
836
                        pos += SHA256_DIGEST_SIZE;
837
                        r = insert_object(map, &map_node[i]); //FIXME error check
838
                }
839
                map->size = i * block_size;
840
        }
841
        print_map(map);
842
        XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
843
        return 0;
844

    
845
        //FIXME cleanup on error
846
}
847

    
848
static int load_map(struct peer_req *pr, struct map *map)
849
{
850
        int r = 0;
851
        struct xseg_request *req;
852
        struct peerd *peer = pr->peer;
853
        map->flags |= MF_MAP_LOADING;
854
        req = __load_map(pr, map);
855
        if (!req)
856
                return -1;
857
        wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
858
        map->flags &= ~MF_MAP_LOADING;
859
        if (req->state & XS_FAILED){
860
                XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
861
                xseg_put_request(peer->xseg, req, pr->portno);
862
                return -1;
863
        }
864
        r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
865
        xseg_put_request(peer->xseg, req, pr->portno);
866
        return r;
867
}
868

    
869
static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
870
                                                uint32_t flags)
871
{
872
        int r;
873
        xport p;
874
        struct xseg_request *req;
875
        struct peerd *peer = pr->peer;
876
        struct mapperd *mapper = __get_mapperd(peer);
877
        void *dummy;
878

    
879
        XSEGLOG2(&lc, I, "Opening map %s", m->volume);
880

    
881
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
882
        if (!req){
883
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
884
                                m->volume);
885
                goto out_fail;
886
        }
887

    
888
        r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
889
        if (r < 0){
890
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
891
                                m->volume);
892
                goto out_put;
893
        }
894

    
895
        char *reqtarget = xseg_get_target(peer->xseg, req);
896
        if (!reqtarget)
897
                goto out_put;
898
        strncpy(reqtarget, m->volume, req->targetlen);
899
        req->op = X_OPEN;
900
        req->size = block_size;
901
        req->offset = 0;
902
        if (!(flags & MF_FORCE))
903
                req->flags = XF_NOSYNC;
904
        r = xseg_set_req_data(peer->xseg, req, pr);
905
        if (r < 0){
906
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
907
                                m->volume);
908
                goto out_put;
909
        }
910
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
911
        if (p == NoPort){ 
912
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
913
                                m->volume);
914
                goto out_unset;
915
        }
916
        r = xseg_signal(peer->xseg, p);
917
        
918
        XSEGLOG2(&lc, I, "Map %s opening", m->volume);
919
        return req;
920

    
921
out_unset:
922
        xseg_get_req_data(peer->xseg, req, &dummy);
923
out_put:
924
        xseg_put_request(peer->xseg, req, pr->portno);
925
out_fail:
926
        return NULL;
927
}
928

    
929
static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
930
{
931
        int err;
932
        struct xseg_request *req;
933
        struct peerd *peer = pr->peer;
934

    
935
        map->flags |= MF_MAP_OPENING;
936
        req = __open_map(pr, map, flags);
937
        if (!req)
938
                return -1;
939
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
940
        map->flags &= ~MF_MAP_OPENING;
941
        err = req->state & XS_FAILED;
942
        xseg_put_request(peer->xseg, req, pr->portno);
943
        if (err)
944
                return -1;
945
        else 
946
                map->flags |= MF_MAP_EXCLUSIVE;
947
        return 0;
948
}
949

    
950
/*
951
 * copy up functions
952
 */
953

    
954
static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
955
{
956
        int r = 0;
957
        if (mn){
958
                r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
959
                if (r == -XHASH_ERESIZE) {
960
                        xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
961
                        xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
962
                        if (!new_hashmap)
963
                                goto out;
964
                        mio->copyups_nodes = new_hashmap;
965
                        r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
966
                }
967
        }
968
        else {
969
                r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
970
                if (r == -XHASH_ERESIZE) {
971
                        xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
972
                        xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
973
                        if (!new_hashmap)
974
                                goto out;
975
                        mio->copyups_nodes = new_hashmap;
976
                        r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
977
                }
978
        }
979
out:
980
        return r;
981
}
982

    
983
static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
984
{
985
        struct map_node *mn;
986
        int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
987
        if (r < 0)
988
                return NULL;
989
        return mn;
990
}
991

    
992
static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
993
{
994
        struct mapperd *mapper = __get_mapperd(peer);
995
        struct mapper_io *mio = __get_mapper_io(pr);
996
        struct map *map = mn->map;
997
        void *dummy;
998
        int r = -1;
999
        xport p;
1000

    
1001
        uint32_t newtargetlen;
1002
        char new_target[MAX_OBJECT_LEN + 1];
1003
        unsigned char sha[SHA256_DIGEST_SIZE];
1004

    
1005
        strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1006

    
1007
        char tmp[XSEG_MAX_TARGETLEN + 1];
1008
        uint32_t tmplen;
1009
        strncpy(tmp, map->volume, map->volumelen);
1010
        sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1011
        tmp[XSEG_MAX_TARGETLEN] = 0;
1012
        tmplen = strlen(tmp);
1013
        SHA256((unsigned char *)tmp, tmplen, sha);
1014
        hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1015
        newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1016

    
1017

    
1018
        if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1019
                goto copyup_zeroblock;
1020

    
1021
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1022
                                                mapper->bportno, X_ALLOC);
1023
        if (!req){
1024
                XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1025
                goto out_err;
1026
        }
1027
        r = xseg_prep_request(peer->xseg, req, newtargetlen, 
1028
                                sizeof(struct xseg_request_copy));
1029
        if (r < 0){
1030
                XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1031
                goto out_put;
1032
        }
1033

    
1034
        char *target = xseg_get_target(peer->xseg, req);
1035
        strncpy(target, new_target, req->targetlen);
1036

    
1037
        struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1038
        strncpy(xcopy->target, mn->object, mn->objectlen);
1039
        xcopy->targetlen = mn->objectlen;
1040

    
1041
        req->offset = 0;
1042
        req->size = block_size;
1043
        req->op = X_COPY;
1044
        r = xseg_set_req_data(peer->xseg, req, pr);
1045
        if (r<0){
1046
                XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1047
                goto out_put;
1048
        }
1049
        r = __set_copyup_node(mio, req, mn);
1050
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1051
        if (p == NoPort) {
1052
                XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1053
                goto out_unset;
1054
        }
1055
        xseg_signal(peer->xseg, p);
1056
//        mio->copyups++;
1057

    
1058
        mn->flags |= MF_OBJECT_COPYING;
1059
        XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1060
        return req;
1061

    
1062
out_unset:
1063
        r = __set_copyup_node(mio, req, NULL);
1064
        xseg_get_req_data(peer->xseg, req, &dummy);
1065
out_put:
1066
        xseg_put_request(peer->xseg, req, pr->portno);
1067
out_err:
1068
        XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1069
        return NULL;
1070

    
1071
copyup_zeroblock:
1072
        XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1073
                        "Proceeding in writing the new object in map");
1074
        /* construct a tmp map_node for writing purposes */
1075
        struct map_node newmn = *mn;
1076
        newmn.flags = MF_OBJECT_EXIST;
1077
        strncpy(newmn.object, new_target, newtargetlen);
1078
        newmn.object[newtargetlen] = 0;
1079
        newmn.objectlen = newtargetlen;
1080
        newmn.objectidx = mn->objectidx; 
1081
        req = object_write(peer, pr, map, &newmn);
1082
        r = __set_copyup_node(mio, req, mn);
1083
        if (!req){
1084
                XSEGLOG2(&lc, E, "Object write returned error for object %s"
1085
                                "\n\t of map %s [%llu]",
1086
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
1087
                return NULL;
1088
        }
1089
        mn->flags |= MF_OBJECT_WRITING;
1090
        XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1091
        return req;
1092
}
1093

    
1094
static struct xseg_request * delete_object(struct peer_req *pr, struct map_node *mn)
1095
{
1096
        void *dummy;
1097
        struct peerd *peer = pr->peer;
1098
        struct mapperd *mapper = __get_mapperd(peer);
1099
        struct mapper_io *mio = __get_mapper_io(pr);
1100
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1101
                                                        mapper->bportno, X_ALLOC);
1102
        XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1103
        if (!req){
1104
                XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1105
                goto out_err;
1106
        }
1107
        int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1108
        if (r < 0){
1109
                XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1110
                goto out_put;
1111
        }
1112
        char *target = xseg_get_target(peer->xseg, req);
1113
        strncpy(target, mn->object, req->targetlen);
1114
        req->op = X_DELETE;
1115
        req->size = req->datalen;
1116
        req->offset = 0;
1117
        r = xseg_set_req_data(peer->xseg, req, pr);
1118
        if (r < 0){
1119
                XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1120
                goto out_put;
1121
        }
1122
        __set_copyup_node(mio, req, mn);
1123
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1124
        if (p == NoPort){
1125
                XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1126
                goto out_unset;
1127
        }
1128
        r = xseg_signal(peer->xseg, p);
1129
        XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1130
        return req;
1131

    
1132
out_unset:
1133
        xseg_get_req_data(peer->xseg, req, &dummy);
1134
out_put:
1135
        xseg_put_request(peer->xseg, req, pr->portno);
1136
out_err:
1137
        XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1138
        return NULL;
1139
}
1140

    
1141
static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
1142
{
1143
        void *dummy;
1144
        struct peerd *peer = pr->peer;
1145
        struct mapperd *mapper = __get_mapperd(peer);
1146
        struct mapper_io *mio = __get_mapper_io(pr);
1147
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1148
                                                        mapper->mbportno, X_ALLOC);
1149
        XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1150
        if (!req){
1151
                XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1152
                goto out_err;
1153
        }
1154
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1155
        if (r < 0){
1156
                XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1157
                goto out_put;
1158
        }
1159
        char *target = xseg_get_target(peer->xseg, req);
1160
        strncpy(target, map->volume, req->targetlen);
1161
        req->op = X_DELETE;
1162
        req->size = req->datalen;
1163
        req->offset = 0;
1164
        r = xseg_set_req_data(peer->xseg, req, pr);
1165
        if (r < 0){
1166
                XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1167
                goto out_put;
1168
        }
1169
        __set_copyup_node(mio, req, NULL);
1170
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1171
        if (p == NoPort){
1172
                XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1173
                goto out_unset;
1174
        }
1175
        r = xseg_signal(peer->xseg, p);
1176
        map->flags |= MF_MAP_DELETING;
1177
        XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1178
        return req;
1179

    
1180
out_unset:
1181
        xseg_get_req_data(peer->xseg, req, &dummy);
1182
out_put:
1183
        xseg_put_request(peer->xseg, req, pr->portno);
1184
out_err:
1185
        XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1186
        return  NULL;
1187
}
1188

    
1189

    
1190
static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1191
{
1192
        struct map_node *mn = find_object(map, index);
1193
        if (mn)
1194
                mn->ref++;
1195
        return mn;
1196
}
1197

    
1198
static inline void put_mapnode(struct map_node *mn)
1199
{
1200
        mn->ref--;
1201
        if (!mn->ref){
1202
                //clean up mn
1203
                st_cond_destroy(mn->cond);
1204
        }
1205
}
1206

    
1207
static inline void __get_map(struct map *map)
1208
{
1209
        map->ref++;
1210
}
1211

    
1212
static inline void put_map(struct map *map)
1213
{
1214
        struct map_node *mn;
1215
        map->ref--;
1216
        if (!map->ref){
1217
                XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1218
                //clean up map
1219
                uint64_t i;
1220
                for (i = 0; i < calc_map_obj(map); i++) {
1221
                        mn = get_mapnode(map, i);
1222
                        if (mn) {
1223
                                //make sure all pending operations on all objects are completed
1224
                                if (mn->flags & MF_OBJECT_NOT_READY){
1225
                                        //this should never happen...
1226
                                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1227
                                }
1228
                                mn->flags &= MF_OBJECT_DESTROYED;
1229
                                put_mapnode(mn); //matchin mn->ref = 1 on mn init
1230
                                put_mapnode(mn); //matcing get_mapnode;
1231
                                //assert mn->ref == 0;
1232
                        }
1233
                }
1234
                mn = find_object(map, 0);
1235
                if (mn)
1236
                        free(mn);
1237
                XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1238
                free(map);
1239
        }
1240
}
1241

    
1242
static struct map * create_map(struct mapperd *mapper, char *name,
1243
                                uint32_t namelen, uint32_t flags)
1244
{
1245
        int r;
1246
        if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1247
                XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1248
                                        namelen, MAX_VOLUME_LEN);
1249
                return NULL;
1250
        }
1251
        struct map *m = malloc(sizeof(struct map));
1252
        if (!m){
1253
                XSEGLOG2(&lc, E, "Cannot allocate map ");
1254
                goto out_err;
1255
        }
1256
        m->size = -1;
1257
        if (flags & MF_ARCHIP){
1258
                strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1259
                strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1260
                m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1261
                m->volumelen = MAPPER_PREFIX_LEN + namelen;
1262
                m->version = 1; /* keep this hardcoded for now */
1263
        }
1264
        else {
1265
                strncpy(m->volume, name, namelen);
1266
                m->volume[namelen] = 0;
1267
                m->volumelen = namelen;
1268
                m->version = 0; /* version 0 should be pithos maps */
1269
        }
1270
        m->flags = 0;
1271
        m->objects = xhash_new(3, INTEGER); 
1272
        if (!m->objects){
1273
                XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1274
                                m->volume);
1275
                goto out_map;
1276
        }
1277
        m->ref = 1;
1278
        m->waiters = 0;
1279
        m->cond = st_cond_new(); //FIXME err check;
1280
        r = insert_map(mapper, m);
1281
        if (r < 0){
1282
                XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1283
                goto out_hash;
1284
        }
1285

    
1286
        return m;
1287

    
1288
out_hash:
1289
        xhash_free(m->objects);
1290
out_map:
1291
        XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1292
        free(m);
1293
out_err:
1294
        return NULL;
1295
}
1296

    
1297

    
1298

    
1299
void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1300
{
1301
        struct peerd *peer = pr->peer;
1302
        struct mapperd *mapper = __get_mapperd(peer);
1303
        (void)mapper;
1304
        struct mapper_io *mio = __get_mapper_io(pr);
1305
        struct map_node *mn = __get_copyup_node(mio, req);
1306

    
1307
        mio->del_pending--;
1308
        if (req->state & XS_FAILED){
1309
                mio->err = 1;
1310
        }
1311
        signal_mapnode(mn);
1312
        xseg_put_request(peer->xseg, req, pr->portno);
1313
        signal_pr(pr);
1314
}
1315

    
1316
void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1317
{
1318
        struct peerd *peer = pr->peer;
1319
        struct mapperd *mapper = __get_mapperd(peer);
1320
        (void)mapper;
1321
        struct mapper_io *mio = __get_mapper_io(pr);
1322
        struct map_node *mn = __get_copyup_node(mio, req);
1323
        if (!mn){
1324
                XSEGLOG2(&lc, E, "Cannot get map node");
1325
                goto out_err;
1326
        }
1327
        __set_copyup_node(mio, req, NULL);
1328

    
1329
        if (req->state & XS_FAILED){
1330
                XSEGLOG2(&lc, E, "Req failed");
1331
                mn->flags &= ~MF_OBJECT_COPYING;
1332
                mn->flags &= ~MF_OBJECT_WRITING;
1333
                goto out_err;
1334
        }
1335
        if (req->op == X_WRITE) {
1336
                char *target = xseg_get_target(peer->xseg, req);
1337
                (void)target;
1338
                //printf("handle object write replyi\n");
1339
                __set_copyup_node(mio, req, NULL);
1340
                //assert mn->flags & MF_OBJECT_WRITING
1341
                mn->flags &= ~MF_OBJECT_WRITING;
1342

    
1343
                struct map_node tmp;
1344
                char *data = xseg_get_data(peer->xseg, req);
1345
                map_to_object(&tmp, (unsigned char *) data);
1346
                mn->flags |= MF_OBJECT_EXIST;
1347
                if (mn->flags != MF_OBJECT_EXIST){
1348
                        XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1349
                        goto out_err;
1350
                }
1351
                //assert mn->flags & MF_OBJECT_EXIST
1352
                strncpy(mn->object, tmp.object, tmp.objectlen);
1353
                mn->object[tmp.objectlen] = 0;
1354
                mn->objectlen = tmp.objectlen;
1355
                XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1356
                mio->copyups--;
1357
                signal_mapnode(mn);
1358
        } else if (req->op == X_COPY) {
1359
        //        issue write_object;
1360
                mn->flags &= ~MF_OBJECT_COPYING;
1361
                struct map *map = mn->map;
1362
                if (!map){
1363
                        XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1364
                        goto out_err;
1365
                }
1366

    
1367
                /* construct a tmp map_node for writing purposes */
1368
                char *target = xseg_get_target(peer->xseg, req);
1369
                struct map_node newmn = *mn;
1370
                newmn.flags = MF_OBJECT_EXIST;
1371
                strncpy(newmn.object, target, req->targetlen);
1372
                newmn.object[req->targetlen] = 0;
1373
                newmn.objectlen = req->targetlen;
1374
                newmn.objectidx = mn->objectidx; 
1375
                struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1376
                if (!xreq){
1377
                        XSEGLOG2(&lc, E, "Object write returned error for object %s"
1378
                                        "\n\t of map %s [%llu]",
1379
                                        mn->object, map->volume, (unsigned long long) mn->objectidx);
1380
                        goto out_err;
1381
                }
1382
                mn->flags |= MF_OBJECT_WRITING;
1383
                __set_copyup_node (mio, xreq, mn);
1384

    
1385
                XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1386
        } else {
1387
                //wtf??
1388
                ;
1389
        }
1390

    
1391
out:
1392
        xseg_put_request(peer->xseg, req, pr->portno);
1393
        return;
1394

    
1395
out_err:
1396
        mio->copyups--;
1397
        XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1398
        mio->err = 1;
1399
        if (mn)
1400
                signal_mapnode(mn);
1401
        goto out;
1402

    
1403
}
1404

    
1405
struct r2o {
1406
        struct map_node *mn;
1407
        uint64_t offset;
1408
        uint64_t size;
1409
};
1410

    
1411
static int req2objs(struct peer_req *pr, struct map *map, int write)
1412
{
1413
        int r = 0;
1414
        struct peerd *peer = pr->peer;
1415
        struct mapper_io *mio = __get_mapper_io(pr);
1416
        char *target = xseg_get_target(peer->xseg, pr->req);
1417
        uint32_t nr_objs = calc_nr_obj(pr->req);
1418
        uint64_t size = sizeof(struct xseg_reply_map) + 
1419
                        nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1420
        uint32_t idx, i, ready;
1421
        uint64_t rem_size, obj_index, obj_offset, obj_size; 
1422
        struct map_node *mn;
1423
        mio->copyups = 0;
1424
        XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1425

    
1426
        /* get map_nodes of request */
1427
        struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1428
        if (!mns){
1429
                XSEGLOG2(&lc, E, "Cannot allocate mns");
1430
                return -1;
1431
        }
1432
        idx = 0;
1433
        rem_size = pr->req->size;
1434
        obj_index = pr->req->offset / block_size;
1435
        obj_offset = pr->req->offset & (block_size -1); //modulo
1436
        obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1437
        mn = get_mapnode(map, obj_index);
1438
        if (!mn) {
1439
                XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1440
                r = -1;
1441
                goto out;
1442
        }
1443
        mns[idx].mn = mn;
1444
        mns[idx].offset = obj_offset;
1445
        mns[idx].size = obj_size;
1446
        rem_size -= obj_size;
1447
        while (rem_size > 0) {
1448
                idx++;
1449
                obj_index++;
1450
                obj_offset = 0;
1451
                obj_size = (rem_size >  block_size) ? block_size : rem_size;
1452
                rem_size -= obj_size;
1453
                mn = get_mapnode(map, obj_index);
1454
                if (!mn) {
1455
                        XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1456
                        r = -1;
1457
                        goto out;
1458
                }
1459
                mns[idx].mn = mn;
1460
                mns[idx].offset = obj_offset;
1461
                mns[idx].size = obj_size;
1462
        }
1463
        if (write) {
1464
                ready = 0;
1465
                int can_wait = 0;
1466
                mio->cb=copyup_cb;
1467
                while (ready < (idx + 1)){
1468
                        ready = 0;
1469
                        for (i = 0; i < (idx+1); i++) {
1470
                                mn = mns[i].mn;
1471
                                //do copyups
1472
                                if (mn->flags & MF_OBJECT_NOT_READY) {
1473
                                        if (can_wait){
1474
                                                wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1475
                                                if (mn->flags & MF_OBJECT_DELETED){
1476
                                                        mio->err = 1;
1477
                                                }
1478
                                                if (mio->err){
1479
                                                        XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1480
                                                        if (!mio->copyups){
1481
                                                                r = -1;
1482
                                                                goto out;
1483
                                                        }
1484
                                                }
1485
                                        }
1486
                                }
1487
                                else if (!(mn->flags & MF_OBJECT_EXIST)) {
1488
                                        //calc new_target, copy up object
1489
                                        if (copyup_object(peer, mn, pr) == NULL){
1490
                                                XSEGLOG2(&lc, E, "Error in copy up object");
1491
                                                mio->err = 1;
1492
                                        } else {
1493
                                                mio->copyups++;
1494
                                        }
1495
                                } else {
1496
                                        ready++;
1497
                                }
1498
                        }
1499
                        can_wait = 1;
1500
                }
1501
                /*
1502
pending_copyups:
1503
                while(mio->copyups > 0){
1504
                        mio->cb = copyup_cb;
1505
                        wait_on_pr(pr, 0);
1506
                        ta--;
1507
                        st_cond_wait(pr->cond);
1508
                }
1509
                */
1510
        }
1511

    
1512
        if (mio->err){
1513
                r = -1;
1514
                XSEGLOG2(&lc, E, "Mio->err");
1515
                goto out;
1516
        }
1517

    
1518
        /* resize request to fit reply */
1519
        char buf[XSEG_MAX_TARGETLEN];
1520
        strncpy(buf, target, pr->req->targetlen);
1521
        r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1522
        if (r < 0) {
1523
                XSEGLOG2(&lc, E, "Cannot resize request");
1524
                goto out;
1525
        }
1526
        target = xseg_get_target(peer->xseg, pr->req);
1527
        strncpy(target, buf, pr->req->targetlen);
1528

    
1529
        /* structure reply */
1530
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1531
        reply->cnt = nr_objs;
1532
        for (i = 0; i < (idx+1); i++) {
1533
                strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1534
                reply->segs[i].targetlen = mns[i].mn->objectlen;
1535
                reply->segs[i].offset = mns[i].offset;
1536
                reply->segs[i].size = mns[i].size;
1537
        }
1538
out:
1539
        for (i = 0; i < idx; i++) {
1540
                put_mapnode(mns[i].mn);
1541
        }
1542
        free(mns);
1543
        return r;
1544
}
1545

    
1546
static int do_dropcache(struct peer_req *pr, struct map *map)
1547
{
1548
        struct map_node *mn;
1549
        struct peerd *peer = pr->peer;
1550
        struct mapperd *mapper = __get_mapperd(peer);
1551
        uint64_t i;
1552
        XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1553
        map->flags |= MF_MAP_DROPPING_CACHE;
1554
        for (i = 0; i < calc_map_obj(map); i++) {
1555
                mn = get_mapnode(map, i);
1556
                if (mn) {
1557
                        if (!(mn->flags & MF_OBJECT_DESTROYED)){
1558
                                //make sure all pending operations on all objects are completed
1559
                                if (mn->flags & MF_OBJECT_NOT_READY){
1560
                                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1561
                                }
1562
                                mn->flags &= MF_OBJECT_DESTROYED;
1563
                        }
1564
                        put_mapnode(mn);
1565
                }
1566
        }
1567
        map->flags &= ~MF_MAP_DROPPING_CACHE;
1568
        map->flags |= MF_MAP_DESTROYED;
1569
        remove_map(mapper, map);
1570
        XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1571
        put_map(map);        // put map here to destroy it (matches m->ref = 1 on map create)
1572
        return 0;
1573
}
1574

    
1575
static int do_info(struct peer_req *pr, struct map *map)
1576
{
1577
        struct peerd *peer = pr->peer;
1578
        struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1579
        xinfo->size = map->size;
1580
        return 0;
1581
}
1582

    
1583

    
1584
static int do_close(struct peer_req *pr, struct map *map)
1585
{
1586
//        struct peerd *peer = pr->peer;
1587
//        struct xseg_request *req;
1588
        if (map->flags & MF_MAP_EXCLUSIVE) 
1589
                close_map(pr, map);
1590
        return do_dropcache(pr, map);
1591
}
1592

    
1593
static int do_destroy(struct peer_req *pr, struct map *map)
1594
{
1595
        uint64_t i;
1596
        struct peerd *peer = pr->peer;
1597
        struct mapper_io *mio = __get_mapper_io(pr);
1598
        struct map_node *mn;
1599
        struct xseg_request *req;
1600
        
1601
        XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1602
        map->flags |= MF_MAP_DELETING;
1603
        req = delete_map(pr, map);
1604
        if (!req)
1605
                return -1;
1606
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1607
        if (req->state & XS_FAILED){
1608
                xseg_put_request(peer->xseg, req, pr->portno);
1609
                map->flags &= ~MF_MAP_DELETING;
1610
                return -1;
1611
        }
1612
        xseg_put_request(peer->xseg, req, pr->portno);
1613
        //FIXME
1614
        uint64_t nr_obj = calc_map_obj(map);
1615
        uint64_t deleted = 0;
1616
        while (deleted < nr_obj){ 
1617
                deleted = 0;
1618
                for (i = 0; i < nr_obj; i++){
1619
                        mn = get_mapnode(map, i);
1620
                        if (mn) {
1621
                                if (!(mn->flags & MF_OBJECT_DESTROYED)){
1622
                                        if (mn->flags & MF_OBJECT_EXIST){
1623
                                                //make sure all pending operations on all objects are completed
1624
                                                if (mn->flags & MF_OBJECT_NOT_READY){
1625
                                                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1626
                                                }
1627
                                                req = delete_object(pr, mn);
1628
                                                if (!req)
1629
                                                        if (mio->del_pending){
1630
                                                                goto wait_pending;
1631
                                                        } else {
1632
                                                                continue;
1633
                                                        }
1634
                                                else {
1635
                                                        mio->del_pending++;
1636
                                                }
1637
                                        }
1638
                                        mn->flags &= MF_OBJECT_DESTROYED;
1639
                                }
1640
                                put_mapnode(mn);
1641
                        }
1642
                        deleted++;
1643
                }
1644
wait_pending:
1645
                mio->cb = deletion_cb;
1646
                wait_on_pr(pr, mio->del_pending > 0);
1647
        }
1648
        mio->cb = NULL;
1649
        map->flags &= ~MF_MAP_DELETING;
1650
        XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1651
        return do_close(pr, map);
1652
}
1653

    
1654
static int do_mapr(struct peer_req *pr, struct map *map)
1655
{
1656
        struct peerd *peer = pr->peer;
1657
        int r = req2objs(pr, map, 0);
1658
        if  (r < 0){
1659
                XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1660
                                map->volume, 
1661
                                (unsigned long long) pr->req->offset, 
1662
                                (unsigned long long) (pr->req->offset + pr->req->size));
1663
                return -1;
1664
        }
1665
        XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1666
                        map->volume, 
1667
                        (unsigned long long) pr->req->offset, 
1668
                        (unsigned long long) (pr->req->offset + pr->req->size));
1669
        XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1670
                        (unsigned long long) pr->req->offset,
1671
                        (unsigned long long) pr->req->size);
1672
        char buf[XSEG_MAX_TARGETLEN+1];
1673
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1674
        int i;
1675
        for (i = 0; i < reply->cnt; i++) {
1676
                XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1677
                strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1678
                buf[reply->segs[i].targetlen] = 0;
1679
                XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1680
                                (unsigned long long) reply->segs[i].offset,
1681
                                (unsigned long long) reply->segs[i].size);
1682
        }
1683
        return 0;
1684
}
1685

    
1686
static int do_mapw(struct peer_req *pr, struct map *map)
1687
{
1688
        struct peerd *peer = pr->peer;
1689
        int r = req2objs(pr, map, 1);
1690
        if  (r < 0){
1691
                XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1692
                                map->volume, 
1693
                                (unsigned long long) pr->req->offset, 
1694
                                (unsigned long long) (pr->req->offset + pr->req->size));
1695
                return -1;
1696
        }
1697
        XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1698
                        map->volume, 
1699
                        (unsigned long long) pr->req->offset, 
1700
                        (unsigned long long) (pr->req->offset + pr->req->size));
1701
        XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1702
                        (unsigned long long) pr->req->offset,
1703
                        (unsigned long long) pr->req->size);
1704
        char buf[XSEG_MAX_TARGETLEN+1];
1705
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1706
        int i;
1707
        for (i = 0; i < reply->cnt; i++) {
1708
                XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1709
                strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1710
                buf[reply->segs[i].targetlen] = 0;
1711
                XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1712
                                (unsigned long long) reply->segs[i].offset,
1713
                                (unsigned long long) reply->segs[i].size);
1714
        }
1715
        return 0;
1716
}
1717

    
1718
//here map is the parent map
1719
static int do_clone(struct peer_req *pr, struct map *map)
1720
{
1721
        /*
1722
        FIXME check if clone map exists
1723
        clonemap = get_map(pr, target, targetlen, MF_LOAD);
1724
        if (clonemap)
1725
                do_dropcache(pr, clonemap); // drop map here, rely on get_map_function to drop
1726
                                        //        cache on non-exclusive opens or declare a NO_CACHE flag ?
1727
                return -1;
1728
        */
1729

    
1730
        int r;
1731
        char buf[XSEG_MAX_TARGETLEN];
1732
        struct peerd *peer = pr->peer;
1733
        struct mapperd *mapper = __get_mapperd(peer);
1734
        char *target = xseg_get_target(peer->xseg, pr->req);
1735
        struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1736
        XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1737
        struct map *clonemap = create_map(mapper, target, pr->req->targetlen,
1738
                                                MF_ARCHIP);
1739
        if (!clonemap) 
1740
                return -1;
1741

    
1742
        if (xclone->size == -1)
1743
                clonemap->size = map->size;
1744
        else
1745
                clonemap->size = xclone->size;
1746
        if (clonemap->size < map->size){
1747
                target = xseg_get_target(peer->xseg, pr->req);
1748
                strncpy(buf, target, pr->req->targetlen);
1749
                buf[pr->req->targetlen] = 0;
1750
                XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1751
                                "\n\t for requested clone %s",
1752
                                (unsigned long long) xclone->size,
1753
                                (unsigned long long) map->size, buf);
1754
                goto out_err;
1755
        }
1756

    
1757
        //alloc and init map_nodes
1758
        unsigned long c = clonemap->size/block_size + 1;
1759
        struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1760
        if (!map_nodes){
1761
                goto out_err;
1762
        }
1763
        int i;
1764
        for (i = 0; i < clonemap->size/block_size + 1; i++) {
1765
                struct map_node *mn = get_mapnode(map, i);
1766
                if (mn) {
1767
                        strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1768
                        map_nodes[i].objectlen = mn->objectlen;
1769
                        put_mapnode(mn);
1770
                } else {
1771
                        strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1772
                        map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1773
                }
1774
                map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1775
                map_nodes[i].flags = 0;
1776
                map_nodes[i].objectidx = i;
1777
                map_nodes[i].map = clonemap;
1778
                map_nodes[i].ref = 1;
1779
                map_nodes[i].waiters = 0;
1780
                map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1781
                r = insert_object(clonemap, &map_nodes[i]);
1782
                if (r < 0){
1783
                        XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1784
                        goto out_err;
1785
                }
1786
        }
1787
        r = write_map(pr, clonemap);
1788
        if (r < 0){
1789
                XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1790
                goto out_err;
1791
        }
1792
        return 0;
1793

    
1794
out_err:
1795
        put_map(clonemap);
1796
        return -1;
1797
}
1798

    
1799
static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1800
{
1801
        int r, opened = 0;
1802
        if (flags & MF_EXCLUSIVE){
1803
                r = open_map(pr, map, flags);
1804
                if (r < 0) {
1805
                        if (flags & MF_FORCE){
1806
                                return -1;
1807
                        }
1808
                } else {
1809
                        opened = 1;
1810
                }
1811
        }
1812
        r = load_map(pr, map);
1813
        if (r < 0 && opened){
1814
                close_map(pr, map);
1815
        }
1816
        return r;
1817
}
1818

    
1819
struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
1820
                        uint32_t flags)
1821
{
1822
        int r;
1823
        struct peerd *peer = pr->peer;
1824
        struct mapperd *mapper = __get_mapperd(peer);
1825
        struct map *map = find_map_len(mapper, name, namelen, flags);
1826
        if (!map){
1827
                if (flags & MF_LOAD){
1828
                        map = create_map(mapper, name, namelen, flags);
1829
                        if (!map)
1830
                                return NULL;
1831
                        r = open_load_map(pr, map, flags);
1832
                        if (r < 0){
1833
                                do_dropcache(pr, map);
1834
                                return NULL;
1835
                        }
1836
                } else {
1837
                        return NULL;
1838
                }
1839
        } else if (map->flags & MF_MAP_DESTROYED){
1840
                return NULL;
1841
        }
1842
        __get_map(map);
1843
        return map;
1844

    
1845
}
1846

    
1847
static int map_action(int (action)(struct peer_req *pr, struct map *map),
1848
                struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1849
{
1850
        //struct peerd *peer = pr->peer;
1851
        struct map *map;
1852
start:
1853
        map = get_map(pr, name, namelen, flags);
1854
        if (!map)
1855
                return -1;
1856
        if (map->flags & MF_MAP_NOT_READY){
1857
                wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1858
                put_map(map);
1859
                goto start;
1860
        }
1861
        int r = action(pr, map);
1862
        //always drop cache if map not read exclusively
1863
        if (!(map->flags & MF_MAP_EXCLUSIVE))
1864
                do_dropcache(pr, map);
1865
        signal_map(map);
1866
        put_map(map);
1867
        return r;
1868
}
1869

    
1870
void * handle_info(struct peer_req *pr)
1871
{
1872
        struct peerd *peer = pr->peer;
1873
        char *target = xseg_get_target(peer->xseg, pr->req);
1874
        int r = map_action(do_info, pr, target, pr->req->targetlen,
1875
                                MF_ARCHIP|MF_LOAD);
1876
        if (r < 0)
1877
                fail(peer, pr);
1878
        else
1879
                complete(peer, pr);
1880
        ta--;
1881
        return NULL;
1882
}
1883

    
1884
void * handle_clone(struct peer_req *pr)
1885
{
1886
        int r;
1887
        struct peerd *peer = pr->peer;
1888
        struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1889
        if (!xclone) {
1890
                r = -1;
1891
                goto out;
1892
        }
1893
        if (xclone->targetlen){
1894
                //support clone only from pithos
1895
                r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
1896
                                        MF_LOAD);
1897
        } else {
1898
                if (!xclone->size){
1899
                        r = -1;
1900
                } else {
1901
                        //FIXME
1902
                        struct map *map;
1903
                        char *target = xseg_get_target(peer->xseg, pr->req);
1904
                        XSEGLOG2(&lc, I, "Creating volume");
1905
                        map = get_map(pr, target, pr->req->targetlen,
1906
                                                MF_ARCHIP|MF_LOAD);
1907
                        if (map){
1908
                                XSEGLOG2(&lc, E, "Volume %s exists", map->volume);
1909
                                if (map->ref <= 2) //initial one + one ref from __get_map
1910
                                        do_dropcache(pr, map); //we are the only ones usining this map. Drop the cache. 
1911
                                put_map(map); //matches get_map
1912
                                r = -1;
1913
                                goto out;
1914
                        }
1915
                        //create a new empty map of size
1916
                        map = create_map(mapper, target, pr->req->targetlen,
1917
                                                MF_ARCHIP);
1918
                        if (!map){
1919
                                r = -1;
1920
                                goto out;
1921
                        }
1922
                        map->size = xclone->size;
1923
                        //populate_map with zero objects;
1924
                        uint64_t nr_objs = xclone->size / block_size;
1925
                        if (xclone->size % block_size)
1926
                                nr_objs++;
1927

    
1928
                        struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1929
                        if (!map_nodes){
1930
                                do_dropcache(pr, map); //Since we just created the map, dropping cache should be sufficient.
1931
                                r = -1;
1932
                                goto out;
1933
                        }
1934
                        uint64_t i;
1935
                        for (i = 0; i < nr_objs; i++) {
1936
                                strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
1937
                                map_nodes[i].objectlen = ZERO_BLOCK_LEN;
1938
                                map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1939
                                map_nodes[i].flags = 0;
1940
                                map_nodes[i].objectidx = i;
1941
                                map_nodes[i].map = map;
1942
                                map_nodes[i].ref = 1;
1943
                                map_nodes[i].waiters = 0;
1944
                                map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1945
                                r = insert_object(map, &map_nodes[i]);
1946
                                if (r < 0){
1947
                                        do_dropcache(pr, map);
1948
                                        r = -1;
1949
                                        goto out;
1950
                                }
1951
                        }
1952
                        r = write_map(pr, map);
1953
                        if (r < 0){
1954
                                XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1955
                                do_dropcache(pr, map);
1956
                                goto out;
1957
                        }
1958
                        XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1959
                        r = 0;
1960
                        do_dropcache(pr, map); //drop cache here for consistency
1961
                }
1962
        }
1963
out:
1964
        if (r < 0)
1965
                fail(peer, pr);
1966
        else
1967
                complete(peer, pr);
1968
        ta--;
1969
        return NULL;
1970
}
1971

    
1972
void * handle_mapr(struct peer_req *pr)
1973
{
1974
        struct peerd *peer = pr->peer;
1975
        char *target = xseg_get_target(peer->xseg, pr->req);
1976
        int r = map_action(do_mapr, pr, target, pr->req->targetlen,
1977
                                MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
1978
        if (r < 0)
1979
                fail(peer, pr);
1980
        else
1981
                complete(peer, pr);
1982
        ta--;
1983
        return NULL;
1984
}
1985

    
1986
void * handle_mapw(struct peer_req *pr)
1987
{
1988
        struct peerd *peer = pr->peer;
1989
        char *target = xseg_get_target(peer->xseg, pr->req);
1990
        int r = map_action(do_mapw, pr, target, pr->req->targetlen,
1991
                                MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1992
        if (r < 0)
1993
                fail(peer, pr);
1994
        else
1995
                complete(peer, pr);
1996
        XSEGLOG2(&lc, D, "Ta: %d", ta);
1997
        ta--;
1998
        return NULL;
1999
}
2000

    
2001
void * handle_destroy(struct peer_req *pr)
2002
{
2003
        struct peerd *peer = pr->peer;
2004
        char *target = xseg_get_target(peer->xseg, pr->req);
2005
        int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2006
                                MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2007
        if (r < 0)
2008
                fail(peer, pr);
2009
        else
2010
                complete(peer, pr);
2011
        ta--;
2012
        return NULL;
2013
}
2014

    
2015
void * handle_close(struct peer_req *pr)
2016
{
2017
        struct peerd *peer = pr->peer;
2018
        char *target = xseg_get_target(peer->xseg, pr->req);
2019
        //here we do not want to load
2020
        int r = map_action(do_close, pr, target, pr->req->targetlen,
2021
                                MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2022
        if (r < 0)
2023
                fail(peer, pr);
2024
        else
2025
                complete(peer, pr);
2026
        ta--;
2027
        return NULL;
2028
}
2029

    
2030
int dispatch_accepted(struct peerd *peer, struct peer_req *pr, 
2031
                        struct xseg_request *req)
2032
{
2033
        //struct mapperd *mapper = __get_mapperd(peer);
2034
        struct mapper_io *mio = __get_mapper_io(pr);
2035
        void *(*action)(struct peer_req *) = NULL;
2036

    
2037
        mio->state = ACCEPTED;
2038
        mio->err = 0;
2039
        mio->cb = NULL;
2040
        switch (pr->req->op) {
2041
                /* primary xseg operations of mapper */
2042
                case X_CLONE: action = handle_clone; break;
2043
                case X_MAPR: action = handle_mapr; break;
2044
                case X_MAPW: action = handle_mapw; break;
2045
//                case X_SNAPSHOT: handle_snap(peer, pr, req); break;
2046
                case X_INFO: action = handle_info; break;
2047
                case X_DELETE: action = handle_destroy; break;
2048
                case X_CLOSE: action = handle_close; break;
2049
                default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2050
        }
2051
        if (action){
2052
                ta++;
2053
                mio->active = 1;
2054
                st_thread_create(action, pr, 0, 0);
2055
        }
2056
        return 0;
2057

    
2058
}
2059

    
2060
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2061
                enum dispatch_reason reason)
2062
{
2063
        struct mapperd *mapper = __get_mapperd(peer);
2064
        (void) mapper;
2065
        struct mapper_io *mio = __get_mapper_io(pr);
2066
        (void) mio;
2067

    
2068

    
2069
        if (reason == dispatch_accept)
2070
                dispatch_accepted(peer, pr, req);
2071
        else {
2072
                if (mio->cb){
2073
                        mio->cb(pr, req);
2074
                } else { 
2075
                        signal_pr(pr);
2076
                }
2077
        }
2078
        return 0;
2079
}
2080

    
2081
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2082
{
2083
        int i;
2084

    
2085
        //FIXME error checks
2086
        struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2087
        peer->priv = mapperd;
2088
        mapper = mapperd;
2089
        mapper->hashmaps = xhash_new(3, STRING);
2090

    
2091
        printf("%llu \n", MAX_VOLUME_SIZE);
2092
        for (i = 0; i < peer->nr_ops; i++) {
2093
                struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2094
                mio->copyups_nodes = xhash_new(3, INTEGER);
2095
                mio->copyups = 0;
2096
                mio->err = 0;
2097
                mio->active = 0;
2098
                peer->peer_reqs[i].priv = mio;
2099
        }
2100

    
2101
        for (i = 0; i < argc; i++) {
2102
                if (!strcmp(argv[i], "-bp") && (i+1) < argc){
2103
                        mapper->bportno = atoi(argv[i+1]);
2104
                        i += 1;
2105
                        continue;
2106
                }
2107
                if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
2108
                        mapper->mbportno = atoi(argv[i+1]);
2109
                        i += 1;
2110
                        continue;
2111
                }
2112
                /* enforce only one thread */
2113
                if (!strcmp(argv[i], "-t") && (i+1) < argc){
2114
                        int t = atoi(argv[i+1]);
2115
                        if (t != 1) {
2116
                                printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
2117
                                return -1;
2118
                        }
2119
                        i += 1;
2120
                        continue;
2121
                }
2122
        }
2123

    
2124
        const struct sched_param param = { .sched_priority = 99 };
2125
        sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
2126

    
2127

    
2128
//        test_map(peer);
2129

    
2130
        return 0;
2131
}
2132

    
2133
void custom_peer_finalize(struct peerd *peer)
2134
{
2135
        struct mapperd *mapper = __get_mapperd(peer);
2136
        struct peer_req *pr = alloc_peer_req(peer);
2137
        if (!pr){
2138
                XSEGLOG2(&lc, E, "Cannot get peer request");
2139
                return;
2140
        }
2141
        int r;
2142
        struct map *map;
2143
        xhash_iter_t it;
2144
        xhashidx key, val;
2145
        xhash_iter_init(mapper->hashmaps, &it);
2146
        while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
2147
                map = (struct map *)val;
2148
                if (!(map->flags & MF_MAP_EXCLUSIVE))
2149
                        continue;
2150
                if (close_map(pr, map) < 0)
2151
                        XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
2152
        }
2153
        return;
2154

    
2155

    
2156
}
2157

    
2158
void print_obj(struct map_node *mn)
2159
{
2160
        fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2161
                        (unsigned long long) mn->objectidx, mn->object, 
2162
                        (unsigned int) mn->objectlen, 
2163
                        (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2164
}
2165

    
2166
void print_map(struct map *m)
2167
{
2168
        uint64_t nr_objs = m->size/block_size;
2169
        if (m->size % block_size)
2170
                nr_objs++;
2171
        fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n", 
2172
                        m->volume, m->volumelen, 
2173
                        (unsigned long long) m->size, 
2174
                        (unsigned long long) nr_objs,
2175
                        m->version);
2176
        uint64_t i;
2177
        struct map_node *mn;
2178
        if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2179
                return;
2180
        for (i = 0; i < nr_objs; i++) {
2181
                mn = find_object(m, i);
2182
                if (!mn){
2183
                        printf("object idx [%llu] not found!\n", (unsigned long long) i);
2184
                        continue;
2185
                }
2186
                print_obj(mn);
2187
        }
2188
}
2189

    
2190
/*
2191
void test_map(struct peerd *peer)
2192
{
2193
        int i,j, ret;
2194
        //struct sha256_ctx sha256ctx;
2195
        unsigned char buf[SHA256_DIGEST_SIZE];
2196
        char buf_new[XSEG_MAX_TARGETLEN + 20];
2197
        struct map *m = malloc(sizeof(struct map));
2198
        strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2199
        m->volume[XSEG_MAX_TARGETLEN] = 0;
2200
        strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2201
        buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2202
        m->volumelen = XSEG_MAX_TARGETLEN;
2203
        m->size = 100*block_size;
2204
        m->objects = xhash_new(3, INTEGER);
2205
        struct map_node *map_node = calloc(100, sizeof(struct map_node));
2206
        for (i = 0; i < 100; i++) {
2207
                sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2208
                gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2209
                
2210
                for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2211
                        sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2212
                }
2213
                map_node[i].objectidx = i;
2214
                map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2215
                map_node[i].flags = MF_OBJECT_EXIST;
2216
                ret = insert_object(m, &map_node[i]);
2217
        }
2218

2219
        char *data = malloc(block_size);
2220
        mapheader_to_map(m, data);
2221
        uint64_t pos = mapheader_size;
2222

2223
        for (i = 0; i < 100; i++) {
2224
                map_node = find_object(m, i);
2225
                if (!map_node){
2226
                        printf("no object node %d \n", i);
2227
                        exit(1);
2228
                }
2229
                object_to_map(data+pos, map_node);
2230
                pos += objectsize_in_map;
2231
        }
2232
//        print_map(m);
2233

2234
        struct map *m2 = malloc(sizeof(struct map));
2235
        strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2236
        m->volume[XSEG_MAX_TARGETLEN] = 0;
2237
        m->volumelen = XSEG_MAX_TARGETLEN;
2238

2239
        m2->objects = xhash_new(3, INTEGER);
2240
        ret = read_map(peer, m2, data);
2241
//        print_map(m2);
2242

2243
        int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2244
        ssize_t r, sum = 0;
2245
        while (sum < block_size) {
2246
                r = write(fd, data + sum, block_size -sum);
2247
                if (r < 0){
2248
                        perror("write");
2249
                        printf("write error\n");
2250
                        exit(1);
2251
                } 
2252
                sum += r;
2253
        }
2254
        close(fd);
2255
        map_node = find_object(m, 0);
2256
        free(map_node);
2257
        free(m);
2258
}
2259
*/