Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / mt-mapperd.c @ a1e35fad

History | View | Annotate | Download (80.5 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#include <stdio.h>
36
#include <unistd.h>
37
#include <sys/types.h>
38
#include <pthread.h>
39
#include <xseg/xseg.h>
40
#include <peer.h>
41
#include <time.h>
42
#include <xtypes/xlock.h>
43
#include <xtypes/xhash.h>
44
#include <xseg/protocol.h>
45
#include <sys/stat.h>
46
#include <fcntl.h>
47
#include <errno.h>
48
#include <sched.h>
49
#include <sys/syscall.h>
50
#include <openssl/sha.h>
51
#include <ctype.h>
52

    
53
/* general mapper flags */
54
#define MF_LOAD         (1 << 0)
55
#define MF_EXCLUSIVE         (1 << 1)
56
#define MF_FORCE         (1 << 2)
57
#define MF_ARCHIP        (1 << 3)
58

    
59
#ifndef SHA256_DIGEST_SIZE
60
#define SHA256_DIGEST_SIZE 32
61
#endif
62
/* hex representation of sha256 value takes up double the sha256 size */
63
#define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
64

    
65
#define block_size (1<<22) //FIXME this should be defined here?
66

    
67
/* transparency byte + max object len in disk */
68
#define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
69

    
70
/* Map header contains:
71
 *         map version
72
 *         volume size
73
 */
74
#define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
75

    
76

    
77
#define MAPPER_PREFIX "archip_"
78
#define MAPPER_PREFIX_LEN 7
79

    
80
#define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
81
#define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
82

    
83
#if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
84
#error         "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
85
#endif
86

    
87
#define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
88

    
89
#if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
90
#error         "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
91
#endif
92

    
93
#define MAX_VOLUME_SIZE \
94
((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
95

    
96

    
97
//char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
98

    
99
/* pithos considers this a block full of zeros, so should we.
100
 * it is actually the sha256 hash of nothing.
101
 */
102
char *zero_block="e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
103
#define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
104

    
105
/* dispatch_internal mapper states */
106
enum mapper_state {
107
        ACCEPTED = 0,
108
        WRITING = 1,
109
        COPYING = 2,
110
        DELETING = 3,
111
        DROPPING_CACHE = 4
112
};
113

    
114
typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
115

    
116

    
117
/* mapper object flags */
118
#define MF_OBJECT_EXIST                (1 << 0)
119
#define MF_OBJECT_COPYING        (1 << 1)
120
#define MF_OBJECT_WRITING        (1 << 2)
121
#define MF_OBJECT_DELETING        (1 << 3)
122
#define MF_OBJECT_DESTROYED        (1 << 5)
123
#define MF_OBJECT_SNAPSHOTTING        (1 << 6)
124

    
125
#define MF_OBJECT_NOT_READY        (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
126
                                MF_OBJECT_DELETING|MF_OBJECT_SNAPSHOTTING)
127
struct map_node {
128
        uint32_t flags;
129
        uint32_t objectidx;
130
        uint32_t objectlen;
131
        char object[MAX_OBJECT_LEN + 1];         /* NULL terminated string */
132
        struct map *map;
133
        uint32_t ref;
134
        uint32_t waiters;
135
        st_cond_t cond;
136
};
137

    
138

    
139
#define wait_on_pr(__pr, __condition__)         \
140
        do {                                        \
141
                ta--;                                \
142
                __get_mapper_io(pr)->active = 0;\
143
                XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
144
                st_cond_wait(__pr->cond);        \
145
        } while (__condition__)
146

    
147
#define wait_on_mapnode(__mn, __condition__)        \
148
        do {                                        \
149
                ta--;                                \
150
                __mn->waiters++;                \
151
                XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
152
                        ta: %u",  __mn, __mn->object, __mn->waiters, ta);  \
153
                st_cond_wait(__mn->cond);        \
154
        } while (__condition__)
155

    
156
#define wait_on_map(__map, __condition__)        \
157
        do {                                        \
158
                ta--;                                \
159
                __map->waiters++;                \
160
                XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
161
                                   __map, __map->volume, __map->waiters, ta); \
162
                st_cond_wait(__map->cond);        \
163
        } while (__condition__)
164

    
165
#define signal_pr(__pr)                                \
166
        do {                                         \
167
                if (!__get_mapper_io(pr)->active){\
168
                        ta++;                        \
169
                        XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta);\
170
                        __get_mapper_io(pr)->active = 1;\
171
                        st_cond_signal(__pr->cond);        \
172
                }                                \
173
        }while(0)
174

    
175
#define signal_map(__map)                        \
176
        do {                                         \
177
                XSEGLOG2(&lc, D, "Checking map %lx %s. Waiters %u, ta: %u", \
178
                                __map, __map->volume, __map->waiters, ta);  \
179
                if (__map->waiters) {                \
180
                        ta += __map->waiters;                \
181
                        XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
182
                        ta: %u",  __map, __map->volume, __map->waiters, ta); \
183
                        __map->waiters = 0;        \
184
                        st_cond_broadcast(__map->cond);        \
185
                }                                \
186
        }while(0)
187

    
188
#define signal_mapnode(__mn)                        \
189
        do {                                         \
190
                if (__mn->waiters) {                \
191
                        ta += __mn->waiters;        \
192
                        XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
193
                        %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
194
                        __mn->waiters = 0;        \
195
                        st_cond_broadcast(__mn->cond);        \
196
                }                                \
197
        }while(0)
198

    
199

    
200
/* map flags */
201
#define MF_MAP_LOADING                (1 << 0)
202
#define MF_MAP_DESTROYED        (1 << 1)
203
#define MF_MAP_WRITING                (1 << 2)
204
#define MF_MAP_DELETING                (1 << 3)
205
#define MF_MAP_DROPPING_CACHE        (1 << 4)
206
#define MF_MAP_EXCLUSIVE        (1 << 5)
207
#define MF_MAP_OPENING                (1 << 6)
208
#define MF_MAP_CLOSING                (1 << 7)
209
#define MF_MAP_DELETED                (1 << 8)
210
#define MF_MAP_SNAPSHOTTING        (1 << 9)
211

    
212
#define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
213
                                MF_MAP_DROPPING_CACHE|MF_MAP_OPENING|               \
214
                                MF_MAP_SNAPSHOTTING)
215

    
216
struct map {
217
        uint32_t version;
218
        uint32_t flags;
219
        uint64_t size;
220
        uint32_t volumelen;
221
        char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
222
        xhash_t *objects;         /* obj_index --> map_node */
223
        uint32_t ref;
224
        uint32_t waiters;
225
        st_cond_t cond;
226
};
227

    
228
struct mapperd {
229
        xport bportno;                /* blocker that accesses data */
230
        xport mbportno;                /* blocker that accesses maps */
231
        xhash_t *hashmaps; // hash_function(target) --> struct map
232
};
233

    
234
struct mapper_io {
235
        volatile uint32_t copyups;        /* nr of copyups pending, issued by this mapper io */
236
        xhash_t *copyups_nodes;                /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
237
        struct map_node *copyup_node;
238
        volatile int err;                        /* error flag */
239
        volatile uint64_t del_pending;
240
        volatile uint64_t snap_pending;
241
        uint64_t delobj;
242
        uint64_t dcobj;
243
        cb_t cb;
244
        enum mapper_state state;
245
        volatile int active;
246
};
247

    
248
/* global vars */
249
struct mapperd *mapper;
250

    
251
void print_map(struct map *m);
252

    
253

    
254
void custom_peer_usage()
255
{
256
        fprintf(stderr, "Custom peer options: \n"
257
                        "-bp  : port for block blocker(!)\n"
258
                        "-mbp : port for map blocker\n"
259
                        "\n");
260
}
261

    
262

    
263
/*
264
 * Helper functions
265
 */
266

    
267
static inline struct mapperd * __get_mapperd(struct peerd *peer)
268
{
269
        return (struct mapperd *) peer->priv;
270
}
271

    
272
static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
273
{
274
        return (struct mapper_io *) pr->priv;
275
}
276

    
277
static inline uint64_t calc_map_obj(struct map *map)
278
{
279
        if (map->size == -1)
280
                return 0;
281
        uint64_t nr_objs = map->size / block_size;
282
        if (map->size % block_size)
283
                nr_objs++;
284
        return nr_objs;
285
}
286

    
287
static uint32_t calc_nr_obj(struct xseg_request *req)
288
{
289
        unsigned int r = 1;
290
        uint64_t rem_size = req->size;
291
        uint64_t obj_offset = req->offset & (block_size -1); //modulo
292
        uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
293
        rem_size -= obj_size;
294
        while (rem_size > 0) {
295
                obj_size = (rem_size > block_size) ? block_size : rem_size;
296
                rem_size -= obj_size;
297
                r++;
298
        }
299

    
300
        return r;
301
}
302

    
303
/* hexlify function.
304
 * Unsafe. Doesn't check if data length is odd!
305
 */
306

    
307
static void hexlify(unsigned char *data, char *hex)
308
{
309
        int i;
310
        for (i=0; i<SHA256_DIGEST_LENGTH; i++)
311
                sprintf(hex+2*i, "%02x", data[i]);
312
}
313

    
314
static void unhexlify(char *hex, unsigned char *data)
315
{
316
        int i;
317
        char c;
318
        for (i=0; i<SHA256_DIGEST_LENGTH; i++){
319
                data[i] = 0;
320
                c = hex[2*i];
321
                if (isxdigit(c)){
322
                        if (isdigit(c)){
323
                                c-= '0';
324
                        }
325
                        else {
326
                                c = tolower(c);
327
                                c = c-'a' + 10;
328
                        }
329
                }
330
                else {
331
                        c = 0;
332
                }
333
                data[i] |= (c << 4) & 0xF0;
334
                c = hex[2*i+1];
335
                if (isxdigit(c)){
336
                        if (isdigit(c)){
337
                                c-= '0';
338
                        }
339
                        else {
340
                                c = tolower(c);
341
                                c = c-'a' + 10;
342
                        }
343
                }
344
                else {
345
                        c = 0;
346
                }
347
                data[i] |= c & 0x0F;
348
        }
349
}
350

    
351
void merkle_hash(unsigned char *hashes, unsigned long len,
352
                unsigned char hash[SHA256_DIGEST_SIZE])
353
{
354
        uint32_t i, l, s = 2;
355
        uint32_t nr = len/SHA256_DIGEST_SIZE;
356
        unsigned char *buf;
357
        unsigned char tmp_hash[SHA256_DIGEST_SIZE];
358

    
359
        if (!nr){
360
                SHA256(hashes, 0, hash);
361
                return;
362
        }
363
        if (nr == 1){
364
                memcpy(hash, hashes, SHA256_DIGEST_SIZE);
365
                return;
366
        }
367
        while (s < nr)
368
                s = s << 1;
369
        buf = malloc(sizeof(unsigned char)* SHA256_DIGEST_SIZE * s);
370
        memcpy(buf, hashes, nr * SHA256_DIGEST_SIZE);
371
        memset(buf + nr * SHA256_DIGEST_SIZE, 0, (s - nr) * SHA256_DIGEST_SIZE);
372
        for (l = s; l > 1; l = l/2) {
373
                for (i = 0; i < l; i += 2) {
374
                        SHA256(buf + (i * SHA256_DIGEST_SIZE),
375
                                        2 * SHA256_DIGEST_SIZE, tmp_hash);
376
                        memcpy(buf + (i/2 * SHA256_DIGEST_SIZE),
377
                                        tmp_hash, SHA256_DIGEST_SIZE);
378
                }
379
        }
380
        memcpy(hash, buf, SHA256_DIGEST_SIZE);
381
}
382

    
383
/*
384
 * Maps handling functions
385
 */
386

    
387
static struct map * find_map(struct mapperd *mapper, char *volume)
388
{
389
        struct map *m = NULL;
390
        int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
391
                                (xhashidx *) &m);
392
        if (r < 0)
393
                return NULL;
394
        return m;
395
}
396

    
397
static struct map * find_map_len(struct mapperd *mapper, char *target,
398
                                        uint32_t targetlen, uint32_t flags)
399
{
400
        char buf[XSEG_MAX_TARGETLEN+1];
401
        if (flags & MF_ARCHIP){
402
                strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
403
                strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
404
                buf[MAPPER_PREFIX_LEN + targetlen] = 0;
405
                targetlen += MAPPER_PREFIX_LEN;
406
        }
407
        else {
408
                strncpy(buf, target, targetlen);
409
                buf[targetlen] = 0;
410
        }
411

    
412
        if (targetlen > MAX_VOLUME_LEN){
413
                XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
414
                                        targetlen, MAX_VOLUME_LEN);
415
                return NULL;
416
        }
417

    
418
        XSEGLOG2(&lc, D, "looking up map %s, len %u",
419
                        buf, targetlen);
420
        return find_map(mapper, buf);
421
}
422

    
423

    
424
static int insert_map(struct mapperd *mapper, struct map *map)
425
{
426
        int r = -1;
427

    
428
        if (find_map(mapper, map->volume)){
429
                XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
430
                goto out;
431
        }
432

    
433
        XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
434
                        map->volume, strlen(map->volume), (unsigned long) map);
435
        r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
436
        while (r == -XHASH_ERESIZE) {
437
                xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
438
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
439
                if (!new_hashmap){
440
                        XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
441
                                        (unsigned long long) shift);
442
                        goto out;
443
                }
444
                mapper->hashmaps = new_hashmap;
445
                r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
446
        }
447
out:
448
        return r;
449
}
450

    
451
static int remove_map(struct mapperd *mapper, struct map *map)
452
{
453
        int r = -1;
454

    
455
        //assert no pending pr on map
456

    
457
        r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
458
        while (r == -XHASH_ERESIZE) {
459
                xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
460
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
461
                if (!new_hashmap){
462
                        XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
463
                                        (unsigned long long) shift);
464
                        goto out;
465
                }
466
                mapper->hashmaps = new_hashmap;
467
                r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
468
        }
469
out:
470
        return r;
471
}
472

    
473
static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
474
{
475
        int r;
476
        xport p;
477
        struct peerd *peer = pr->peer;
478
        struct xseg_request *req;
479
        struct mapperd *mapper = __get_mapperd(peer);
480
        void *dummy;
481

    
482
        XSEGLOG2(&lc, I, "Closing map %s", map->volume);
483

    
484
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
485
        if (!req){
486
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
487
                                map->volume);
488
                goto out_err;
489
        }
490

    
491
        r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
492
        if (r < 0){
493
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
494
                                map->volume);
495
                goto out_put;
496
        }
497

    
498
        char *reqtarget = xseg_get_target(peer->xseg, req);
499
        if (!reqtarget)
500
                goto out_put;
501
        strncpy(reqtarget, map->volume, req->targetlen);
502
        req->op = X_RELEASE;
503
        req->size = 0;
504
        req->offset = 0;
505
        r = xseg_set_req_data(peer->xseg, req, pr);
506
        if (r < 0){
507
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
508
                                map->volume);
509
                goto out_put;
510
        }
511
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
512
        if (p == NoPort){
513
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
514
                                map->volume);
515
                goto out_unset;
516
        }
517
        r = xseg_signal(peer->xseg, p);
518
        map->flags |= MF_MAP_CLOSING;
519

    
520
        XSEGLOG2(&lc, I, "Map %s closing", map->volume);
521
        return req;
522

    
523
out_unset:
524
        xseg_get_req_data(peer->xseg, req, &dummy);
525
out_put:
526
        xseg_put_request(peer->xseg, req, pr->portno);
527
out_err:
528
        return NULL;
529
}
530

    
531
static int close_map(struct peer_req *pr, struct map *map)
532
{
533
        int err;
534
        struct xseg_request *req;
535
        struct peerd *peer = pr->peer;
536

    
537
        req = __close_map(pr, map);
538
        if (!req)
539
                return -1;
540
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
541
        map->flags &= ~MF_MAP_CLOSING;
542
        err = req->state & XS_FAILED;
543
        xseg_put_request(peer->xseg, req, pr->portno);
544
        if (err)
545
                return -1;
546
        return 0;
547
}
548

    
549
/*
550
static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
551
                                char *target, uint32_t targetlen, struct map **m)
552
{
553
        struct mapperd *mapper = __get_mapperd(peer);
554
        int r;
555
        *m = find_map(mapper, target, targetlen);
556
        if (*m) {
557
                XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
558
                if ((*m)->flags & MF_MAP_NOT_READY) {
559
                        __xq_append_tail(&(*m)->pending, (xqindex) pr);
560
                        XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
561
                        return MF_PENDING;
562
                //} else if ((*m)->flags & MF_MAP_DESTROYED){
563
                //        return -1;
564
                // 
565
                }else {
566
                        XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
567
                        return 0;
568
                }
569
        }
570
        r = open_map(peer, pr, target, targetlen, 0);
571
        if (r < 0)
572
                return -1; //error
573
        return MF_PENDING;        
574
}
575
*/
576
/*
577
 * Object handling functions
578
 */
579

    
580
struct map_node *find_object(struct map *map, uint64_t obj_index)
581
{
582
        struct map_node *mn;
583
        int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
584
        if (r < 0)
585
                return NULL;
586
        return mn;
587
}
588

    
589
static int insert_object(struct map *map, struct map_node *mn)
590
{
591
        //FIXME no find object first
592
        int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
593
        if (r == -XHASH_ERESIZE) {
594
                unsigned long shift = xhash_grow_size_shift(map->objects);
595
                map->objects = xhash_resize(map->objects, shift, NULL);
596
                if (!map->objects)
597
                        return -1;
598
                r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
599
        }
600
        return r;
601
}
602

    
603

    
604
/*
605
 * map read/write functions
606
 *
607
 * version 0 -> pithos map
608
 * version 1 -> archipelago version 1
609
 *
610
 *
611
 * functions
612
 *         int read_object(struct map_node *mn, unsigned char *buf)
613
 *         int prepare_write_object(struct peer_req *pr, struct map *map,
614
 *                                 struct map_node *mn, struct xseg_request *req)
615
 *         int read_map(struct map *m, unsigned char * data)
616
 *         int prepare_write_map(struct peer_req *pr, struct map *map,
617
 *                                          struct xseg_request *req)
618
 */
619

    
620
struct map_functions {
621
        int (*read_object)(struct map_node *mn, unsigned char *buf);
622
          int (*prepare_write_object)(struct peer_req *pr, struct map *map,
623
                                  struct map_node *mn, struct xseg_request *req);
624
          int (*read_map)(struct map *m, unsigned char * data);
625
         int (*prepare_write_map)(struct peer_req *pr, struct map *map,
626
                                           struct xseg_request *req);
627
};
628

    
629
/* version 0 functions */
630

    
631
/* no header */
632
#define v0_mapheader_size 0
633
/* just the unhexlified name */
634
#define v0_objectsize_in_map SHA256_DIGEST_SIZE
635

    
636
static inline int read_object_v0(struct map_node *mn, unsigned char *buf)
637
{
638
        hexlify(buf, mn->object);
639
        mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
640
        mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
641
        mn->flags = MF_OBJECT_EXIST;
642

    
643
        return 0;
644
}
645

    
646
static void v0_object_to_map(struct map_node *mn, unsigned char *data)
647
{
648
        unhexlify(mn->object, data);
649
}
650

    
651
static int prepare_write_object_v0(struct peer_req *pr, struct map *map,
652
                        struct map_node *mn, struct xseg_request *req)
653
{
654
        struct peerd *peer = pr->peer;
655
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, v0_objectsize_in_map);
656
        if (r < 0){
657
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
658
                                "(Map: %s [%llu]",
659
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
660
                return -1;
661
        }
662
        char *target = xseg_get_target(peer->xseg, req);
663
        strncpy(target, map->volume, req->targetlen);
664
        req->size = req->datalen;
665
        req->offset = v0_mapheader_size + mn->objectidx * v0_objectsize_in_map;
666

    
667
        unsigned char *data = xseg_get_data(pr->peer->xseg, req);
668
        v0_object_to_map(mn, data);
669
        return -1;
670
}
671

    
672
static int read_map_v0(struct map *m, unsigned char * data)
673
{
674
        int r;
675
        struct map_node *map_node;
676
        uint64_t i;
677
        uint64_t pos = 0;
678
        uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
679
        XSEGLOG2(&lc, D, "Max nr_objs %llu", max_nr_objs);
680
        char nulls[SHA256_DIGEST_SIZE];
681
        memset(nulls, 0, SHA256_DIGEST_SIZE);
682
        map_node = calloc(max_nr_objs, sizeof(struct map_node));
683
        if (!map_node)
684
                return -1;
685
        for (i = 0; i < max_nr_objs; i++) {
686
                if (!memcmp(data+pos, nulls, v0_objectsize_in_map))
687
                        break;
688
                map_node[i].objectidx = i;
689
                map_node[i].map = m;
690
                map_node[i].waiters = 0;
691
                map_node[i].ref = 1;
692
                map_node[i].cond = st_cond_new(); //FIXME err check;
693
                read_object_v0(&map_node[i], data+pos);
694
                pos += v0_objectsize_in_map;
695
                r = insert_object(m, &map_node[i]); //FIXME error check
696
        }
697
        XSEGLOG2(&lc, D, "Found %llu objects", i);
698
        m->size = i * block_size;
699
        return 0;
700
}
701

    
702
static int prepare_write_map_v0(struct peer_req *pr, struct map *map,
703
                                struct xseg_request *req)
704
{
705
        struct peerd *peer = pr->peer;
706
        uint64_t i, pos = 0, max_objidx = calc_map_obj(map);
707
        struct map_node *mn;
708
        int r = xseg_prep_request(peer->xseg, req, map->volumelen,
709
                        v0_mapheader_size + max_objidx * v0_objectsize_in_map);
710
        if (r < 0){
711
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
712
                return -1;
713
        }
714
        char *target = xseg_get_target(peer->xseg, req);
715
        strncpy(target, map->volume, req->targetlen);
716
        char *data = xseg_get_data(peer->xseg, req);
717

    
718
        req->op = X_WRITE;
719
        req->size = req->datalen;
720
        req->offset = 0;
721

    
722
        for (i = 0; i < max_objidx; i++) {
723
                mn = find_object(map, i);
724
                if (!mn){
725
                        XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
726
                                        (unsigned long long) i, map->volume);
727
                        return -1;
728
                }
729
                v0_object_to_map(mn, (unsigned char *)(data+pos));
730
                pos += v0_objectsize_in_map;
731
        }
732
        XSEGLOG2(&lc, D, "Prepared %llu objects", i);
733
        return 0;
734
}
735

    
736
/* static struct map_functions map_functions_v0 =        { */
737
/*                         .read_object = read_object_v0, */
738
/*                         .read_map = read_map_v0, */
739
/*                         .prepare_write_object = prepare_write_object_v0, */
740
/*                         .prepare_write_map = prepare_write_map_v0 */
741
/* }; */
742
#define map_functions_v0 {                                \
743
                        .read_object = read_object_v0,        \
744
                        .read_map = read_map_v0,        \
745
                        .prepare_write_object = prepare_write_object_v0,\
746
                        .prepare_write_map = prepare_write_map_v0        \
747
                        }
748
/* v1 functions */
749

    
750
/* transparency byte + max object len in disk */
751
#define v1_objectsize_in_map (1 + SHA256_DIGEST_SIZE)
752

    
753
/* Map header contains:
754
 *         map version
755
 *         volume size
756
 */
757
#define v1_mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
758

    
759
static inline int read_object_v1(struct map_node *mn, unsigned char *buf)
760
{
761
        char c = buf[0];
762
        mn->flags = 0;
763
        if (c){
764
                mn->flags |= MF_OBJECT_EXIST;
765
                strcpy(mn->object, MAPPER_PREFIX);
766
                hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
767
                mn->object[MAX_OBJECT_LEN] = 0;
768
                mn->objectlen = strlen(mn->object);
769
        }
770
        else {
771
                mn->flags &= ~MF_OBJECT_EXIST;
772
                hexlify(buf+1, mn->object);
773
                mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
774
                mn->objectlen = strlen(mn->object);
775
        }
776
        return 0;
777
}
778

    
779
static inline void v1_object_to_map(char* buf, struct map_node *mn)
780
{
781
        buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
782
        if (buf[0]){
783
                /* strip common prefix */
784
                unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
785
        }
786
        else {
787
                unhexlify(mn->object, (unsigned char *)(buf+1));
788
        }
789
}
790

    
791
static int prepare_write_object_v1(struct peer_req *pr, struct map *map,
792
                                struct map_node *mn, struct xseg_request *req)
793
{
794
        struct peerd *peer = pr->peer;
795
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, v1_objectsize_in_map);
796
        if (r < 0){
797
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
798
                                "(Map: %s [%llu]",
799
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
800
                return -1;
801
        }
802
        char *target = xseg_get_target(peer->xseg, req);
803
        strncpy(target, map->volume, req->targetlen);
804
        req->size = req->datalen;
805
        req->offset = v1_mapheader_size + mn->objectidx * v1_objectsize_in_map;
806

    
807
        char *data = xseg_get_data(pr->peer->xseg, req);
808
        v1_object_to_map(data, mn);
809
        return 0;
810
}
811

    
812
static int read_map_v1(struct map *m, unsigned char * data)
813
{
814
        int r;
815
        struct map_node *map_node;
816
        uint64_t i;
817
        uint64_t pos = 0;
818
        uint64_t nr_objs;
819

    
820
        /* read header */
821
        m->version = *(uint32_t *) (data + pos);
822
        pos += sizeof(uint32_t);
823
        m->size = *(uint64_t *) (data + pos);
824
        pos += sizeof(uint64_t);
825

    
826
        /* read objects */
827
        nr_objs = m->size / block_size;
828
        if (m->size % block_size)
829
                nr_objs++;
830
        map_node = calloc(nr_objs, sizeof(struct map_node));
831
        if (!map_node)
832
                return -1;
833

    
834
        for (i = 0; i < nr_objs; i++) {
835
                map_node[i].map = m;
836
                map_node[i].objectidx = i;
837
                map_node[i].waiters = 0;
838
                map_node[i].ref = 1;
839
                map_node[i].cond = st_cond_new(); //FIXME err check;
840
                read_object_v1(&map_node[i], data+pos);
841
                pos += objectsize_in_map;
842
                r = insert_object(m, &map_node[i]); //FIXME error check
843
        }
844
        return 0;
845
}
846

    
847
static int prepare_write_map_v1(struct peer_req *pr, struct map *m,
848
                                struct xseg_request *req)
849
{
850
        struct peerd *peer = pr->peer;
851
        uint64_t i, pos = 0, max_objidx = calc_map_obj(m);
852
        struct map_node *mn;
853

    
854
        int r = xseg_prep_request(peer->xseg, req, m->volumelen,
855
                        v1_mapheader_size + max_objidx * v1_objectsize_in_map);
856
        if (r < 0){
857
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s", m->volume);
858
                return -1;
859
        }
860
        char *target = xseg_get_target(peer->xseg, req);
861
        strncpy(target, m->volume, req->targetlen);
862
        char *data = xseg_get_data(peer->xseg, req);
863

    
864
        memcpy(data + pos, &m->version, sizeof(m->version));
865
        pos += sizeof(m->version);
866
        memcpy(data + pos, &m->size, sizeof(m->size));
867
        pos += sizeof(m->size);
868

    
869
        req->op = X_WRITE;
870
        req->size = req->datalen;
871
        req->offset = 0;
872

    
873
        for (i = 0; i < max_objidx; i++) {
874
                mn = find_object(m, i);
875
                if (!mn){
876
                        XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
877
                                        (unsigned long long) i, m->volume);
878
                        return -1;
879
                }
880
                v1_object_to_map(data+pos, mn);
881
                pos += v1_objectsize_in_map;
882
        }
883
        return 0;
884
}
885

    
886
/* static struct map_functions map_functions_v1 =        { */
887
/*                         .read_object = read_object_v1, */
888
/*                         .read_map = read_map_v1, */
889
/*                         .prepare_write_object = prepare_write_object_v1, */
890
/*                         .prepare_write_map = prepare_write_map_v1 */
891
/* }; */
892
#define map_functions_v1 {                                \
893
                        .read_object = read_object_v1,        \
894
                        .read_map = read_map_v1,        \
895
                        .prepare_write_object = prepare_write_object_v1,\
896
                        .prepare_write_map = prepare_write_map_v1        \
897
                        }
898

    
899
static struct map_functions map_functions[] = { map_functions_v0,
900
                                                map_functions_v1 };
901
#define MAP_LATEST_VERSION 1
902
/* end of functions */
903

    
904

    
905

    
906

    
907

    
908
static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
909
{
910
        hexlify(buf, mn->object);
911
        mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
912
        mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
913
        mn->flags = MF_OBJECT_EXIST;
914
}
915

    
916
static inline void map_to_object(struct map_node *mn, unsigned char *buf)
917
{
918
        char c = buf[0];
919
        mn->flags = 0;
920
        if (c){
921
                mn->flags |= MF_OBJECT_EXIST;
922
                strcpy(mn->object, MAPPER_PREFIX);
923
                hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
924
                mn->object[MAX_OBJECT_LEN] = 0;
925
                mn->objectlen = strlen(mn->object);
926
        }
927
        else {
928
                hexlify(buf+1, mn->object);
929
                mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
930
                mn->objectlen = strlen(mn->object);
931
        }
932

    
933
}
934

    
935
static inline void object_to_map(char* buf, struct map_node *mn)
936
{
937
        buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
938
        if (buf[0]){
939
                /* strip common prefix */
940
                unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
941
        }
942
        else {
943
                unhexlify(mn->object, (unsigned char *)(buf+1));
944
        }
945
}
946

    
947
static inline void mapheader_to_map(struct map *m, char *buf)
948
{
949
        uint64_t pos = 0;
950
        memcpy(buf + pos, &m->version, sizeof(m->version));
951
        pos += sizeof(m->version);
952
        memcpy(buf + pos, &m->size, sizeof(m->size));
953
        pos += sizeof(m->size);
954
}
955

    
956

    
957
static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
958
                                struct map *map, struct map_node *mn)
959
{
960
        int r;
961
        void *dummy;
962
        struct mapperd *mapper = __get_mapperd(peer);
963
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
964
                                                        mapper->mbportno, X_ALLOC);
965
        if (!req){
966
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
967
                                "(Map: %s [%llu]",
968
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
969
                goto out_err;
970
        }
971

    
972
        r = map_functions[map->version].prepare_write_object(pr, map, mn, req);
973
        if (r < 0){
974
                XSEGLOG2(&lc, E, "Cannot prepare write object");
975
                goto out_put;
976
        }
977
        req->op = X_WRITE;
978

    
979
        r = xseg_set_req_data(peer->xseg, req, pr);
980
        if (r < 0){
981
                XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
982
                                "(Map: %s [%llu]",
983
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
984
                goto out_put;
985
        }
986
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
987
        if (p == NoPort){
988
                XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
989
                                "(Map: %s [%llu]",
990
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
991
                goto out_unset;
992
        }
993
        r = xseg_signal(peer->xseg, p);
994
        if (r < 0)
995
                XSEGLOG2(&lc, W, "Cannot signal port %u", p);
996

    
997
        XSEGLOG2(&lc, I, "Writing object %s \n\t"
998
                        "Map: %s [%llu]",
999
                        mn->object, map->volume, (unsigned long long) mn->objectidx);
1000

    
1001
        return req;
1002

    
1003
out_unset:
1004
        xseg_get_req_data(peer->xseg, req, &dummy);
1005
out_put:
1006
        xseg_put_request(peer->xseg, req, pr->portno);
1007
out_err:
1008
        XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
1009
                        "(Map: %s [%llu]",
1010
                        mn->object, map->volume, (unsigned long long) mn->objectidx);
1011
        return NULL;
1012
}
1013

    
1014
static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
1015
{
1016
        int r;
1017
        void *dummy;
1018
        struct peerd *peer = pr->peer;
1019
        struct mapperd *mapper = __get_mapperd(peer);
1020
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1021
                                                        mapper->mbportno, X_ALLOC);
1022
        if (!req){
1023
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
1024
                goto out_err;
1025
        }
1026

    
1027
        r = map_functions[map->version].prepare_write_map(pr, map, req);
1028
        if (r < 0){
1029
                XSEGLOG2(&lc, E, "Cannot prepare write map");
1030
                goto out_put;
1031
        }
1032

    
1033
        req->op = X_WRITE;
1034

    
1035
        r = xseg_set_req_data(peer->xseg, req, pr);
1036
        if (r < 0){
1037
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1038
                                map->volume);
1039
                goto out_put;
1040
        }
1041
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1042
        if (p == NoPort){
1043
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1044
                                map->volume);
1045
                goto out_unset;
1046
        }
1047
        r = xseg_signal(peer->xseg, p);
1048
        if (r < 0)
1049
                XSEGLOG2(&lc, W, "Cannot signal port %u", p);
1050

    
1051
        map->flags |= MF_MAP_WRITING;
1052
        XSEGLOG2(&lc, I, "Writing map %s", map->volume);
1053
        return req;
1054

    
1055
out_unset:
1056
        xseg_get_req_data(peer->xseg, req, &dummy);
1057
out_put:
1058
        xseg_put_request(peer->xseg, req, pr->portno);
1059
out_err:
1060
        XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
1061
        return NULL;
1062
}
1063

    
1064
static int write_map(struct peer_req* pr, struct map *map)
1065
{
1066
        int r = 0;
1067
        struct peerd *peer = pr->peer;
1068
        struct xseg_request *req = __write_map(pr, map);
1069
        if (!req)
1070
                return -1;
1071
        wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1072
        if (req->state & XS_FAILED)
1073
                r = -1;
1074
        xseg_put_request(peer->xseg, req, pr->portno);
1075
        map->flags &= ~MF_MAP_WRITING;
1076
        return r;
1077
}
1078

    
1079
static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
1080
{
1081
        int r;
1082
        xport p;
1083
        struct xseg_request *req;
1084
        struct peerd *peer = pr->peer;
1085
        struct mapperd *mapper = __get_mapperd(peer);
1086
        void *dummy;
1087

    
1088
        XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
1089

    
1090
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1091
        if (!req){
1092
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1093
                                m->volume);
1094
                goto out_fail;
1095
        }
1096

    
1097
        r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1098
        if (r < 0){
1099
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1100
                                m->volume);
1101
                goto out_put;
1102
        }
1103

    
1104
        char *reqtarget = xseg_get_target(peer->xseg, req);
1105
        if (!reqtarget)
1106
                goto out_put;
1107
        strncpy(reqtarget, m->volume, req->targetlen);
1108
        req->op = X_READ;
1109
        req->size = block_size;
1110
        req->offset = 0;
1111
        r = xseg_set_req_data(peer->xseg, req, pr);
1112
        if (r < 0){
1113
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1114
                                m->volume);
1115
                goto out_put;
1116
        }
1117
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1118
        if (p == NoPort){
1119
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1120
                                m->volume);
1121
                goto out_unset;
1122
        }
1123
        r = xseg_signal(peer->xseg, p);
1124

    
1125
        m->flags |= MF_MAP_LOADING;
1126
        XSEGLOG2(&lc, I, "Map %s loading", m->volume);
1127
        return req;
1128

    
1129
out_unset:
1130
        xseg_get_req_data(peer->xseg, req, &dummy);
1131
out_put:
1132
        xseg_put_request(peer->xseg, req, pr->portno);
1133
out_fail:
1134
        return NULL;
1135
}
1136

    
1137
static int read_map (struct map *map, unsigned char *buf)
1138
{
1139
        char nulls[SHA256_DIGEST_SIZE];
1140
        memset(nulls, 0, SHA256_DIGEST_SIZE);
1141

    
1142
        int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
1143
        if (r) {
1144
                XSEGLOG2(&lc, E, "Read zeros");
1145
                return -1;
1146
        }
1147
        //type 1, archip type, type 0 pithos map
1148
        int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1149
        XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
1150
        uint32_t version;
1151
        if (type)
1152
                version = *(uint32_t *) (buf); //version should always be the first uint32_t
1153
        else
1154
                version = 0;
1155
        if (version > MAP_LATEST_VERSION){
1156
                XSEGLOG2(&lc, E, "Map read for map %s failed. Invalid version %u",
1157
                                map->volume, version);
1158
                return -1;
1159
        }
1160

    
1161
        r = map_functions[version].read_map(map, buf);
1162
        if (r < 0){
1163
                XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
1164
                return -1;
1165
        }
1166

    
1167
        print_map(map);
1168
        XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
1169
        return 0;
1170

    
1171
}
1172

    
1173
static int load_map(struct peer_req *pr, struct map *map)
1174
{
1175
        int r = 0;
1176
        struct xseg_request *req;
1177
        struct peerd *peer = pr->peer;
1178
        req = __load_map(pr, map);
1179
        if (!req)
1180
                return -1;
1181
        wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1182
        map->flags &= ~MF_MAP_LOADING;
1183
        if (req->state & XS_FAILED){
1184
                XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
1185
                xseg_put_request(peer->xseg, req, pr->portno);
1186
                return -1;
1187
        }
1188
        r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
1189
        xseg_put_request(peer->xseg, req, pr->portno);
1190
        return r;
1191
}
1192

    
1193
static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
1194
                                                uint32_t flags)
1195
{
1196
        int r;
1197
        xport p;
1198
        struct xseg_request *req;
1199
        struct peerd *peer = pr->peer;
1200
        struct mapperd *mapper = __get_mapperd(peer);
1201
        void *dummy;
1202

    
1203
        XSEGLOG2(&lc, I, "Opening map %s", m->volume);
1204

    
1205
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1206
        if (!req){
1207
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1208
                                m->volume);
1209
                goto out_fail;
1210
        }
1211

    
1212
        r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1213
        if (r < 0){
1214
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1215
                                m->volume);
1216
                goto out_put;
1217
        }
1218

    
1219
        char *reqtarget = xseg_get_target(peer->xseg, req);
1220
        if (!reqtarget)
1221
                goto out_put;
1222
        strncpy(reqtarget, m->volume, req->targetlen);
1223
        req->op = X_ACQUIRE;
1224
        req->size = block_size;
1225
        req->offset = 0;
1226
        if (!(flags & MF_FORCE))
1227
                req->flags = XF_NOSYNC;
1228
        r = xseg_set_req_data(peer->xseg, req, pr);
1229
        if (r < 0){
1230
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1231
                                m->volume);
1232
                goto out_put;
1233
        }
1234
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1235
        if (p == NoPort){ 
1236
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1237
                                m->volume);
1238
                goto out_unset;
1239
        }
1240
        r = xseg_signal(peer->xseg, p);
1241

    
1242
        m->flags |= MF_MAP_OPENING;
1243
        XSEGLOG2(&lc, I, "Map %s opening", m->volume);
1244
        return req;
1245

    
1246
out_unset:
1247
        xseg_get_req_data(peer->xseg, req, &dummy);
1248
out_put:
1249
        xseg_put_request(peer->xseg, req, pr->portno);
1250
out_fail:
1251
        return NULL;
1252
}
1253

    
1254
static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
1255
{
1256
        int err;
1257
        struct xseg_request *req;
1258
        struct peerd *peer = pr->peer;
1259

    
1260
        req = __open_map(pr, map, flags);
1261
        if (!req){
1262
                return -1;
1263
        }
1264
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1265
        map->flags &= ~MF_MAP_OPENING;
1266
        err = req->state & XS_FAILED;
1267
        xseg_put_request(peer->xseg, req, pr->portno);
1268
        if (err)
1269
                return -1;
1270
        else
1271
                map->flags |= MF_MAP_EXCLUSIVE;
1272
        return 0;
1273
}
1274

    
1275
/*
1276
 * copy up functions
1277
 */
1278

    
1279
static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
1280
{
1281
        int r = 0;
1282
        if (mn){
1283
                XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
1284
                                req, mn, mio);
1285
                r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1286
                if (r == -XHASH_ERESIZE) {
1287
                        xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
1288
                        xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1289
                        if (!new_hashmap)
1290
                                goto out;
1291
                        mio->copyups_nodes = new_hashmap;
1292
                        r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1293
                }
1294
                if (r < 0)
1295
                        XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
1296
                                        req, mn, mio);
1297
        }
1298
        else {
1299
                XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
1300
                                req, mio);
1301
                r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1302
                if (r == -XHASH_ERESIZE) {
1303
                        xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
1304
                        xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
1305
                        if (!new_hashmap)
1306
                                goto out;
1307
                        mio->copyups_nodes = new_hashmap;
1308
                        r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1309
                }
1310
                if (r < 0)
1311
                        XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
1312
                                        req, mio);
1313
        }
1314
out:
1315
        return r;
1316
}
1317

    
1318
static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
1319
{
1320
        struct map_node *mn;
1321
        int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
1322
        if (r < 0){
1323
                XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
1324
                return NULL;
1325
        }
1326
        XSEGLOG2(&lc, D, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
1327
        return mn;
1328
}
1329

    
1330
static struct xseg_request * __snapshot_object(struct peer_req *pr,
1331
                                                struct map_node *mn)
1332
{
1333
        struct peerd *peer = pr->peer;
1334
        struct mapperd *mapper = __get_mapperd(peer);
1335
        struct mapper_io *mio = __get_mapper_io(pr);
1336
        //struct map *map = mn->map;
1337
        void *dummy;
1338
        int r = -1;
1339
        xport p;
1340

    
1341
        //assert mn->volume != zero_block
1342
        //assert mn->flags & MF_OBJECT_EXIST
1343
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1344
                                                mapper->bportno, X_ALLOC);
1345
        if (!req){
1346
                XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1347
                goto out_err;
1348
        }
1349
        r = xseg_prep_request(peer->xseg, req, mn->objectlen,
1350
                                sizeof(struct xseg_request_snapshot));
1351
        if (r < 0){
1352
                XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1353
                goto out_put;
1354
        }
1355

    
1356
        char *target = xseg_get_target(peer->xseg, req);
1357
        strncpy(target, mn->object, req->targetlen);
1358

    
1359
        struct xseg_request_snapshot *xsnapshot = (struct xseg_request_snapshot *) xseg_get_data(peer->xseg, req);
1360
        xsnapshot->target[0] = 0;
1361
        xsnapshot->targetlen = 0;
1362

    
1363
        req->offset = 0;
1364
        req->size = block_size;
1365
        req->op = X_SNAPSHOT;
1366
        r = xseg_set_req_data(peer->xseg, req, pr);
1367
        if (r<0){
1368
                XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1369
                goto out_put;
1370
        }
1371
        r = __set_copyup_node(mio, req, mn);
1372
        if (r < 0)
1373
                goto out_unset;
1374
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1375
        if (p == NoPort) {
1376
                XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1377
                goto out_mapper_unset;
1378
        }
1379
        xseg_signal(peer->xseg, p);
1380

    
1381
        mn->flags |= MF_OBJECT_SNAPSHOTTING;
1382
        XSEGLOG2(&lc, I, "Snapshotting up object %s", mn->object);
1383
        return req;
1384

    
1385
out_mapper_unset:
1386
        __set_copyup_node(mio, req, NULL);
1387
out_unset:
1388
        xseg_get_req_data(peer->xseg, req, &dummy);
1389
out_put:
1390
        xseg_put_request(peer->xseg, req, pr->portno);
1391
out_err:
1392
        XSEGLOG2(&lc, E, "Snapshotting object %s failed", mn->object);
1393
        return NULL;
1394
}
1395

    
1396
static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
1397
{
1398
        struct mapperd *mapper = __get_mapperd(peer);
1399
        struct mapper_io *mio = __get_mapper_io(pr);
1400
        struct map *map = mn->map;
1401
        void *dummy;
1402
        int r = -1;
1403
        xport p;
1404

    
1405
        uint32_t newtargetlen;
1406
        char new_target[MAX_OBJECT_LEN + 1];
1407
        unsigned char sha[SHA256_DIGEST_SIZE];
1408

    
1409
        strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1410

    
1411
        char tmp[XSEG_MAX_TARGETLEN + 1];
1412
        uint32_t tmplen;
1413
        strncpy(tmp, map->volume, map->volumelen);
1414
        sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1415
        tmp[XSEG_MAX_TARGETLEN] = 0;
1416
        tmplen = strlen(tmp);
1417
        SHA256((unsigned char *)tmp, tmplen, sha);
1418
        hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1419
        newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1420

    
1421

    
1422
        if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1423
                goto copyup_zeroblock;
1424

    
1425
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1426
                                                mapper->bportno, X_ALLOC);
1427
        if (!req){
1428
                XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1429
                goto out_err;
1430
        }
1431
        r = xseg_prep_request(peer->xseg, req, newtargetlen, 
1432
                                sizeof(struct xseg_request_copy));
1433
        if (r < 0){
1434
                XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1435
                goto out_put;
1436
        }
1437

    
1438
        char *target = xseg_get_target(peer->xseg, req);
1439
        strncpy(target, new_target, req->targetlen);
1440

    
1441
        struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1442
        strncpy(xcopy->target, mn->object, mn->objectlen);
1443
        xcopy->targetlen = mn->objectlen;
1444

    
1445
        req->offset = 0;
1446
        req->size = block_size;
1447
        req->op = X_COPY;
1448
        r = xseg_set_req_data(peer->xseg, req, pr);
1449
        if (r<0){
1450
                XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1451
                goto out_put;
1452
        }
1453
        r = __set_copyup_node(mio, req, mn);
1454
        if (r < 0)
1455
                goto out_unset;
1456
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1457
        if (p == NoPort) {
1458
                XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1459
                goto out_mapper_unset;
1460
        }
1461
        xseg_signal(peer->xseg, p);
1462
//        mio->copyups++;
1463

    
1464
        mn->flags |= MF_OBJECT_COPYING;
1465
        XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1466
        return req;
1467

    
1468
out_mapper_unset:
1469
        __set_copyup_node(mio, req, NULL);
1470
out_unset:
1471
        xseg_get_req_data(peer->xseg, req, &dummy);
1472
out_put:
1473
        xseg_put_request(peer->xseg, req, pr->portno);
1474
out_err:
1475
        XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1476
        return NULL;
1477

    
1478
copyup_zeroblock:
1479
        XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1480
                        "Proceeding in writing the new object in map");
1481
        /* construct a tmp map_node for writing purposes */
1482
        struct map_node newmn = *mn;
1483
        newmn.flags = MF_OBJECT_EXIST;
1484
        strncpy(newmn.object, new_target, newtargetlen);
1485
        newmn.object[newtargetlen] = 0;
1486
        newmn.objectlen = newtargetlen;
1487
        newmn.objectidx = mn->objectidx; 
1488
        req = object_write(peer, pr, map, &newmn);
1489
        r = __set_copyup_node(mio, req, mn);
1490
        if (r < 0)
1491
                return NULL;
1492
        if (!req){
1493
                XSEGLOG2(&lc, E, "Object write returned error for object %s"
1494
                                "\n\t of map %s [%llu]",
1495
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
1496
                return NULL;
1497
        }
1498
        mn->flags |= MF_OBJECT_WRITING;
1499
        XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1500
        return req;
1501
}
1502

    
1503
static struct xseg_request * __delete_object(struct peer_req *pr, struct map_node *mn)
1504
{
1505
        void *dummy;
1506
        struct peerd *peer = pr->peer;
1507
        struct mapperd *mapper = __get_mapperd(peer);
1508
        struct mapper_io *mio = __get_mapper_io(pr);
1509
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1510
                                                        mapper->bportno, X_ALLOC);
1511
        XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1512
        if (!req){
1513
                XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1514
                goto out_err;
1515
        }
1516
        int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1517
        if (r < 0){
1518
                XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1519
                goto out_put;
1520
        }
1521
        char *target = xseg_get_target(peer->xseg, req);
1522
        strncpy(target, mn->object, req->targetlen);
1523
        req->op = X_DELETE;
1524
        req->size = req->datalen;
1525
        req->offset = 0;
1526
        r = xseg_set_req_data(peer->xseg, req, pr);
1527
        if (r < 0){
1528
                XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1529
                goto out_put;
1530
        }
1531
        r = __set_copyup_node(mio, req, mn);
1532
        if (r < 0)
1533
                goto out_unset;
1534
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1535
        if (p == NoPort){
1536
                XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1537
                goto out_mapper_unset;
1538
        }
1539
        r = xseg_signal(peer->xseg, p);
1540
        mn->flags |= MF_OBJECT_DELETING;
1541
        XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1542
        return req;
1543

    
1544
out_mapper_unset:
1545
        __set_copyup_node(mio, req, NULL);
1546
out_unset:
1547
        xseg_get_req_data(peer->xseg, req, &dummy);
1548
out_put:
1549
        xseg_put_request(peer->xseg, req, pr->portno);
1550
out_err:
1551
        XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1552
        return NULL;
1553
}
1554

    
1555
static struct xseg_request * __delete_map(struct peer_req *pr, struct map *map)
1556
{
1557
        void *dummy;
1558
        struct peerd *peer = pr->peer;
1559
        struct mapperd *mapper = __get_mapperd(peer);
1560
        struct mapper_io *mio = __get_mapper_io(pr);
1561
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1562
                                                        mapper->mbportno, X_ALLOC);
1563
        XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1564
        if (!req){
1565
                XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1566
                goto out_err;
1567
        }
1568
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1569
        if (r < 0){
1570
                XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1571
                goto out_put;
1572
        }
1573
        char *target = xseg_get_target(peer->xseg, req);
1574
        strncpy(target, map->volume, req->targetlen);
1575
        req->op = X_DELETE;
1576
        req->size = req->datalen;
1577
        req->offset = 0;
1578
        r = xseg_set_req_data(peer->xseg, req, pr);
1579
        if (r < 0){
1580
                XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1581
                goto out_put;
1582
        }
1583
        /* do not check return value. just make sure there is no node set */
1584
        __set_copyup_node(mio, req, NULL);
1585
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1586
        if (p == NoPort){
1587
                XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1588
                goto out_unset;
1589
        }
1590
        r = xseg_signal(peer->xseg, p);
1591
        map->flags |= MF_MAP_DELETING;
1592
        XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1593
        return req;
1594

    
1595
out_unset:
1596
        xseg_get_req_data(peer->xseg, req, &dummy);
1597
out_put:
1598
        xseg_put_request(peer->xseg, req, pr->portno);
1599
out_err:
1600
        XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1601
        return  NULL;
1602
}
1603

    
1604

    
1605
static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1606
{
1607
        struct map_node *mn = find_object(map, index);
1608
        if (mn)
1609
                mn->ref++;
1610
        return mn;
1611
}
1612

    
1613
static inline void put_mapnode(struct map_node *mn)
1614
{
1615
        mn->ref--;
1616
        if (!mn->ref){
1617
                //clean up mn
1618
                st_cond_destroy(mn->cond);
1619
        }
1620
}
1621

    
1622
static inline void __get_map(struct map *map)
1623
{
1624
        map->ref++;
1625
}
1626

    
1627
static inline void put_map(struct map *map)
1628
{
1629
        struct map_node *mn;
1630
        XSEGLOG2(&lc, D, "Putting map %lx %s. ref %u", map, map->volume, map->ref);
1631
        map->ref--;
1632
        if (!map->ref){
1633
                XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1634
                //clean up map
1635
                uint64_t i;
1636
                for (i = 0; i < calc_map_obj(map); i++) {
1637
                        mn = get_mapnode(map, i);
1638
                        if (mn) {
1639
                                //make sure all pending operations on all objects are completed
1640
                                //this should never happen...
1641
                                if (mn->flags & MF_OBJECT_NOT_READY)
1642
                                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1643
                                mn->flags |= MF_OBJECT_DESTROYED;
1644
                                put_mapnode(mn); //matchin mn->ref = 1 on mn init
1645
                                put_mapnode(mn); //matcing get_mapnode;
1646
                                //assert mn->ref == 0;
1647
                        }
1648
                }
1649
                mn = find_object(map, 0);
1650
                if (mn)
1651
                        free(mn);
1652
                XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1653
                free(map);
1654
        }
1655
}
1656

    
1657
static struct map * create_map(struct mapperd *mapper, char *name,
1658
                                uint32_t namelen, uint32_t flags)
1659
{
1660
        int r;
1661
        if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1662
                XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1663
                                        namelen, MAX_VOLUME_LEN);
1664
                return NULL;
1665
        }
1666
        struct map *m = malloc(sizeof(struct map));
1667
        if (!m){
1668
                XSEGLOG2(&lc, E, "Cannot allocate map ");
1669
                goto out_err;
1670
        }
1671
        m->size = -1;
1672
        if (flags & MF_ARCHIP){
1673
                strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1674
                strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1675
                m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1676
                m->volumelen = MAPPER_PREFIX_LEN + namelen;
1677
                m->version = 1; /* keep this hardcoded for now */
1678
        }
1679
        else {
1680
                strncpy(m->volume, name, namelen);
1681
                m->volume[namelen] = 0;
1682
                m->volumelen = namelen;
1683
                m->version = 0; /* version 0 should be pithos maps */
1684
        }
1685
        m->flags = 0;
1686
        m->objects = xhash_new(3, INTEGER); 
1687
        if (!m->objects){
1688
                XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1689
                                m->volume);
1690
                goto out_map;
1691
        }
1692
        m->ref = 1;
1693
        m->waiters = 0;
1694
        m->cond = st_cond_new(); //FIXME err check;
1695
        r = insert_map(mapper, m);
1696
        if (r < 0){
1697
                XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1698
                goto out_hash;
1699
        }
1700

    
1701
        return m;
1702

    
1703
out_hash:
1704
        xhash_free(m->objects);
1705
out_map:
1706
        XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1707
        free(m);
1708
out_err:
1709
        return NULL;
1710
}
1711

    
1712

    
1713

    
1714
void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1715
{
1716
        struct peerd *peer = pr->peer;
1717
        struct mapperd *mapper = __get_mapperd(peer);
1718
        (void)mapper;
1719
        struct mapper_io *mio = __get_mapper_io(pr);
1720
        struct map_node *mn = __get_copyup_node(mio, req);
1721

    
1722
        __set_copyup_node(mio, req, NULL);
1723

    
1724
        //assert req->op = X_DELETE;
1725
        //assert pr->req->op = X_DELETE only map deletions make delete requests
1726
        //assert mio->del_pending > 0
1727
        XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
1728
        mio->del_pending--;
1729

    
1730
        if (req->state & XS_FAILED){
1731
                mio->err = 1;
1732
        }
1733
        if (mn){
1734
                XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
1735
                                mn, mn->object, mio, req);
1736
                // assert mn->flags & MF_OBJECT_DELETING
1737
                mn->flags &= ~MF_OBJECT_DELETING;
1738
                mn->flags |= MF_OBJECT_DESTROYED;
1739
                signal_mapnode(mn);
1740
                /* put mapnode here, matches get_mapnode on do_destroy */
1741
                put_mapnode(mn);
1742
        } else {
1743
                XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
1744
                                mio, req);
1745
        }
1746
        xseg_put_request(peer->xseg, req, pr->portno);
1747
        signal_pr(pr);
1748
}
1749

    
1750
void snapshot_cb(struct peer_req *pr, struct xseg_request *req)
1751
{
1752
        struct peerd *peer = pr->peer;
1753
        struct mapperd *mapper = __get_mapperd(peer);
1754
        (void)mapper;
1755
        struct mapper_io *mio = __get_mapper_io(pr);
1756
        struct map_node *mn = __get_copyup_node(mio, req);
1757
        if (!mn){
1758
                XSEGLOG2(&lc, E, "Cannot get map node");
1759
                goto out_err;
1760
        }
1761
        __set_copyup_node(mio, req, NULL);
1762

    
1763
        if (req->state & XS_FAILED){
1764
                if (req->op == X_DELETE){
1765
                        XSEGLOG2(&lc, E, "Delete req failed");
1766
                        goto out_ok;
1767
                }
1768
                XSEGLOG2(&lc, E, "Req failed");
1769
                mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1770
                mn->flags &= ~MF_OBJECT_WRITING;
1771
                goto out_err;
1772
        }
1773

    
1774
        if (req->op == X_WRITE) {
1775
                char old_object_name[MAX_OBJECT_LEN + 1];
1776
                uint32_t old_objectlen;
1777

    
1778
                char *target = xseg_get_target(peer->xseg, req);
1779
                (void)target;
1780
                //assert mn->flags & MF_OBJECT_WRITING
1781
                mn->flags &= ~MF_OBJECT_WRITING;
1782
                strncpy(old_object_name, mn->object, mn->objectlen);
1783
                old_objectlen = mn->objectlen;
1784

    
1785
                struct map_node tmp;
1786
                char *data = xseg_get_data(peer->xseg, req);
1787
                map_to_object(&tmp, (unsigned char *) data);
1788
                mn->flags &= ~MF_OBJECT_EXIST;
1789

    
1790
                strncpy(mn->object, tmp.object, tmp.objectlen);
1791
                mn->object[tmp.objectlen] = 0;
1792
                mn->objectlen = tmp.objectlen;
1793
                XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1794
                //signal_mapnode since Snapshot was successfull
1795
                signal_mapnode(mn);
1796

    
1797
                //do delete old object
1798
                strncpy(tmp.object, old_object_name, old_objectlen);
1799
                tmp.object[old_objectlen] = 0;
1800
                tmp.objectlen = old_objectlen;
1801
                tmp.flags = MF_OBJECT_EXIST;
1802
                struct xseg_request *xreq = __delete_object(pr, &tmp);
1803
                if (!xreq){
1804
                        //just a warning. Snapshot was successfull
1805
                        XSEGLOG2(&lc, W, "Cannot delete old object %s", tmp.object);
1806
                        goto out_ok;
1807
                }
1808
                //overwrite copyup node, since tmp is a stack dummy variable
1809
                __set_copyup_node (mio, xreq, mn);
1810
                XSEGLOG2(&lc, I, "Deletion of %s pending", tmp.object);
1811
        } else if (req->op == X_SNAPSHOT) {
1812
                //issue write_object;
1813
                mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1814
                struct map *map = mn->map;
1815
                if (!map){
1816
                        XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1817
                        goto out_err;
1818
                }
1819

    
1820
                /* construct a tmp map_node for writing purposes */
1821
                //char *target = xseg_get_target(peer->xseg, req);
1822
                struct map_node newmn = *mn;
1823
                newmn.flags = 0;
1824
                struct xseg_reply_snapshot *xreply;
1825
                xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, req);
1826
                //assert xreply->targetlen !=0
1827
                //assert xreply->targetlen < XSEG_MAX_TARGETLEN
1828
                //xreply->target[xreply->targetlen] = 0;
1829
                //assert xreply->target valid
1830
                strncpy(newmn.object, xreply->target, xreply->targetlen);
1831
                newmn.object[req->targetlen] = 0;
1832
                newmn.objectlen = req->targetlen;
1833
                newmn.objectidx = mn->objectidx;
1834
                struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1835
                if (!xreq){
1836
                        XSEGLOG2(&lc, E, "Object write returned error for object %s"
1837
                                        "\n\t of map %s [%llu]",
1838
                                        mn->object, map->volume, (unsigned long long) mn->objectidx);
1839
                        goto out_err;
1840
                }
1841
                mn->flags |= MF_OBJECT_WRITING;
1842
                __set_copyup_node (mio, xreq, mn);
1843

    
1844
                XSEGLOG2(&lc, I, "Object %s snapshot completed. Pending writing.", mn->object);
1845
        } else if (req->op == X_DELETE){
1846
                //deletion of the old block completed
1847
                XSEGLOG2(&lc, I, "Deletion of completed");
1848
                goto out_ok;
1849
                ;
1850
        } else {
1851
                //wtf??
1852
                ;
1853
        }
1854

    
1855
out:
1856
        xseg_put_request(peer->xseg, req, pr->portno);
1857
        return;
1858

    
1859
out_err:
1860
        mio->snap_pending--;
1861
        XSEGLOG2(&lc, D, "Mio->snap_pending: %u", mio->snap_pending);
1862
        mio->err = 1;
1863
        if (mn)
1864
                signal_mapnode(mn);
1865
        signal_pr(pr);
1866
        goto out;
1867

    
1868
out_ok:
1869
        mio->snap_pending--;
1870
        signal_pr(pr);
1871
        goto out;
1872

    
1873

    
1874
}
1875
void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1876
{
1877
        struct peerd *peer = pr->peer;
1878
        struct mapperd *mapper = __get_mapperd(peer);
1879
        (void)mapper;
1880
        struct mapper_io *mio = __get_mapper_io(pr);
1881
        struct map_node *mn = __get_copyup_node(mio, req);
1882
        if (!mn){
1883
                XSEGLOG2(&lc, E, "Cannot get map node");
1884
                goto out_err;
1885
        }
1886
        __set_copyup_node(mio, req, NULL);
1887

    
1888
        if (req->state & XS_FAILED){
1889
                XSEGLOG2(&lc, E, "Req failed");
1890
                mn->flags &= ~MF_OBJECT_COPYING;
1891
                mn->flags &= ~MF_OBJECT_WRITING;
1892
                goto out_err;
1893
        }
1894
        if (req->op == X_WRITE) {
1895
                char *target = xseg_get_target(peer->xseg, req);
1896
                (void)target;
1897
                //printf("handle object write replyi\n");
1898
                __set_copyup_node(mio, req, NULL);
1899
                //assert mn->flags & MF_OBJECT_WRITING
1900
                mn->flags &= ~MF_OBJECT_WRITING;
1901

    
1902
                struct map_node tmp;
1903
                char *data = xseg_get_data(peer->xseg, req);
1904
                map_to_object(&tmp, (unsigned char *) data);
1905
                mn->flags |= MF_OBJECT_EXIST;
1906
                if (mn->flags != MF_OBJECT_EXIST){
1907
                        XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1908
                        goto out_err;
1909
                }
1910
                //assert mn->flags & MF_OBJECT_EXIST
1911
                strncpy(mn->object, tmp.object, tmp.objectlen);
1912
                mn->object[tmp.objectlen] = 0;
1913
                mn->objectlen = tmp.objectlen;
1914
                XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1915
                mio->copyups--;
1916
                signal_mapnode(mn);
1917
                signal_pr(pr);
1918
        } else if (req->op == X_COPY) {
1919
        //        issue write_object;
1920
                mn->flags &= ~MF_OBJECT_COPYING;
1921
                struct map *map = mn->map;
1922
                if (!map){
1923
                        XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1924
                        goto out_err;
1925
                }
1926

    
1927
                /* construct a tmp map_node for writing purposes */
1928
                char *target = xseg_get_target(peer->xseg, req);
1929
                struct map_node newmn = *mn;
1930
                newmn.flags = MF_OBJECT_EXIST;
1931
                strncpy(newmn.object, target, req->targetlen);
1932
                newmn.object[req->targetlen] = 0;
1933
                newmn.objectlen = req->targetlen;
1934
                newmn.objectidx = mn->objectidx; 
1935
                struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1936
                if (!xreq){
1937
                        XSEGLOG2(&lc, E, "Object write returned error for object %s"
1938
                                        "\n\t of map %s [%llu]",
1939
                                        mn->object, map->volume, (unsigned long long) mn->objectidx);
1940
                        goto out_err;
1941
                }
1942
                mn->flags |= MF_OBJECT_WRITING;
1943
                __set_copyup_node (mio, xreq, mn);
1944

    
1945
                XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1946
        } else {
1947
                //wtf??
1948
                ;
1949
        }
1950

    
1951
out:
1952
        xseg_put_request(peer->xseg, req, pr->portno);
1953
        return;
1954

    
1955
out_err:
1956
        mio->copyups--;
1957
        XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1958
        mio->err = 1;
1959
        if (mn)
1960
                signal_mapnode(mn);
1961
        signal_pr(pr);
1962
        goto out;
1963

    
1964
}
1965

    
1966
struct r2o {
1967
        struct map_node *mn;
1968
        uint64_t offset;
1969
        uint64_t size;
1970
};
1971

    
1972
static int req2objs(struct peer_req *pr, struct map *map, int write)
1973
{
1974
        int r = 0;
1975
        struct peerd *peer = pr->peer;
1976
        struct mapper_io *mio = __get_mapper_io(pr);
1977
        char *target = xseg_get_target(peer->xseg, pr->req);
1978
        uint32_t nr_objs = calc_nr_obj(pr->req);
1979
        uint64_t size = sizeof(struct xseg_reply_map) + 
1980
                        nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1981
        uint32_t idx, i;
1982
        uint64_t rem_size, obj_index, obj_offset, obj_size; 
1983
        struct map_node *mn;
1984
        mio->copyups = 0;
1985
        XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1986

    
1987
        /* get map_nodes of request */
1988
        struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1989
        if (!mns){
1990
                XSEGLOG2(&lc, E, "Cannot allocate mns");
1991
                return -1;
1992
        }
1993
        idx = 0;
1994
        rem_size = pr->req->size;
1995
        obj_index = pr->req->offset / block_size;
1996
        obj_offset = pr->req->offset & (block_size -1); //modulo
1997
        obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1998
        mn = get_mapnode(map, obj_index);
1999
        if (!mn) {
2000
                XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
2001
                r = -1;
2002
                goto out;
2003
        }
2004
        mns[idx].mn = mn;
2005
        mns[idx].offset = obj_offset;
2006
        mns[idx].size = obj_size;
2007
        rem_size -= obj_size;
2008
        while (rem_size > 0) {
2009
                idx++;
2010
                obj_index++;
2011
                obj_offset = 0;
2012
                obj_size = (rem_size >  block_size) ? block_size : rem_size;
2013
                rem_size -= obj_size;
2014
                mn = get_mapnode(map, obj_index);
2015
                if (!mn) {
2016
                        XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
2017
                        r = -1;
2018
                        goto out;
2019
                }
2020
                mns[idx].mn = mn;
2021
                mns[idx].offset = obj_offset;
2022
                mns[idx].size = obj_size;
2023
        }
2024
        if (write) {
2025
                int can_wait = 0;
2026
                mio->cb=copyup_cb;
2027
                /* do a first scan and issue as many copyups as we can.
2028
                 * then retry and wait when an object is not ready.
2029
                 * this could be done better, since now we wait also on the
2030
                 * pending copyups
2031
                 */
2032
                int j;
2033
                for (j = 0; j < 2 && !mio->err; j++) {
2034
                        for (i = 0; i < (idx+1); i++) {
2035
                                mn = mns[i].mn;
2036
                                //do copyups
2037
                                if (mn->flags & MF_OBJECT_NOT_READY){
2038
                                        if (!can_wait)
2039
                                                continue;
2040
                                        if (mn->flags & MF_OBJECT_NOT_READY)
2041
                                                wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2042
                                        if (mn->flags & MF_OBJECT_DESTROYED){
2043
                                                mio->err = 1;
2044
                                                continue;
2045
                                        }
2046
                                }
2047

    
2048
                                if (!(mn->flags & MF_OBJECT_EXIST)) {
2049
                                        //calc new_target, copy up object
2050
                                        if (copyup_object(peer, mn, pr) == NULL){
2051
                                                XSEGLOG2(&lc, E, "Error in copy up object");
2052
                                                mio->err = 1;
2053
                                        } else {
2054
                                                mio->copyups++;
2055
                                        }
2056
                                }
2057

    
2058
                                if (mio->err){
2059
                                        XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
2060
                                        break;
2061
                                }
2062
                        }
2063
                        can_wait = 1;
2064
                }
2065
                if (mio->copyups > 0)
2066
                        wait_on_pr(pr, mio->copyups > 0);
2067
        }
2068

    
2069
        if (mio->err){
2070
                r = -1;
2071
                XSEGLOG2(&lc, E, "Mio->err");
2072
                goto out;
2073
        }
2074

    
2075
        /* resize request to fit reply */
2076
        char buf[XSEG_MAX_TARGETLEN];
2077
        strncpy(buf, target, pr->req->targetlen);
2078
        r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
2079
        if (r < 0) {
2080
                XSEGLOG2(&lc, E, "Cannot resize request");
2081
                goto out;
2082
        }
2083
        target = xseg_get_target(peer->xseg, pr->req);
2084
        strncpy(target, buf, pr->req->targetlen);
2085

    
2086
        /* structure reply */
2087
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2088
        reply->cnt = nr_objs;
2089
        for (i = 0; i < (idx+1); i++) {
2090
                strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
2091
                reply->segs[i].targetlen = mns[i].mn->objectlen;
2092
                reply->segs[i].offset = mns[i].offset;
2093
                reply->segs[i].size = mns[i].size;
2094
        }
2095
out:
2096
        for (i = 0; i < idx; i++) {
2097
                put_mapnode(mns[i].mn);
2098
        }
2099
        free(mns);
2100
        mio->cb = NULL;
2101
        return r;
2102
}
2103

    
2104
static int do_dropcache(struct peer_req *pr, struct map *map)
2105
{
2106
        struct map_node *mn;
2107
        struct peerd *peer = pr->peer;
2108
        struct mapperd *mapper = __get_mapperd(peer);
2109
        uint64_t i;
2110
        XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
2111
        map->flags |= MF_MAP_DROPPING_CACHE;
2112
        for (i = 0; i < calc_map_obj(map); i++) {
2113
                mn = get_mapnode(map, i);
2114
                if (mn) {
2115
                        if (!(mn->flags & MF_OBJECT_DESTROYED)){
2116
                                //make sure all pending operations on all objects are completed
2117
                                if (mn->flags & MF_OBJECT_NOT_READY)
2118
                                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2119
                                mn->flags |= MF_OBJECT_DESTROYED;
2120
                        }
2121
                        put_mapnode(mn);
2122
                }
2123
        }
2124
        map->flags &= ~MF_MAP_DROPPING_CACHE;
2125
        map->flags |= MF_MAP_DESTROYED;
2126
        remove_map(mapper, map);
2127
        XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
2128
        put_map(map);        // put map here to destroy it (matches m->ref = 1 on map create)
2129
        return 0;
2130
}
2131

    
2132
static int do_info(struct peer_req *pr, struct map *map)
2133
{
2134
        struct peerd *peer = pr->peer;
2135
        struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
2136
        xinfo->size = map->size;
2137
        return 0;
2138
}
2139

    
2140

    
2141
static int do_open(struct peer_req *pr, struct map *map)
2142
{
2143
        if (map->flags & MF_MAP_EXCLUSIVE){
2144
                return 0;
2145
        }
2146
        else {
2147
                return -1;
2148
        }
2149
}
2150

    
2151
static int do_close(struct peer_req *pr, struct map *map)
2152
{
2153
        if (map->flags & MF_MAP_EXCLUSIVE){
2154
                /* do not drop cache if close failed and map not deleted */
2155
                if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
2156
                        return -1;
2157
        }
2158
        return do_dropcache(pr, map);
2159
}
2160

    
2161
static int do_snapshot(struct peer_req *pr, struct map *map)
2162
{
2163
        uint64_t i;
2164
        struct peerd *peer = pr->peer;
2165
        struct mapper_io *mio = __get_mapper_io(pr);
2166
        struct map_node *mn;
2167
        struct xseg_request *req;
2168

    
2169
        if (!(map->flags & MF_MAP_EXCLUSIVE)){
2170
                XSEGLOG2(&lc, E, "Map was not opened exclusively");
2171
                return -1;
2172
        }
2173
        XSEGLOG2(&lc, I, "Starting snapshot for map %s", map->volume);
2174
        map->flags |= MF_MAP_SNAPSHOTTING;
2175

    
2176
        uint64_t nr_obj = calc_map_obj(map);
2177
        mio->cb = snapshot_cb;
2178
        mio->snap_pending = 0;
2179
        mio->err = 0;
2180
        for (i = 0; i < nr_obj; i++){
2181

    
2182
                /* throttle pending snapshots
2183
                 * this should be nr_ops of the blocker, but since we don't know
2184
                 * that, we assume based on our own nr_ops
2185
                 */
2186
                if (mio->snap_pending >= peer->nr_ops)
2187
                        wait_on_pr(pr, mio->snap_pending >= peer->nr_ops);
2188

    
2189
                mn = get_mapnode(map, i);
2190
                if (!mn)
2191
                        //warning?
2192
                        continue;
2193
                if (!(mn->flags & MF_OBJECT_EXIST)){
2194
                        put_mapnode(mn);
2195
                        continue;
2196
                }
2197
                // make sure all pending operations on all objects are completed
2198
                if (mn->flags & MF_OBJECT_NOT_READY)
2199
                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2200

    
2201
                /* TODO will this ever happen?? */
2202
                if (mn->flags & MF_OBJECT_DESTROYED){
2203
                        put_mapnode(mn);
2204
                        continue;
2205
                }
2206

    
2207
                req = __snapshot_object(pr, mn);
2208
                if (!req){
2209
                        mio->err = 1;
2210
                        put_mapnode(mn);
2211
                        break;
2212
                }
2213
                mio->snap_pending++;
2214
                /* do not put_mapnode here. cb does that */
2215
        }
2216

    
2217
        if (mio->snap_pending > 0)
2218
                wait_on_pr(pr, mio->snap_pending > 0);
2219
        mio->cb = NULL;
2220

    
2221
        if (mio->err)
2222
                goto out_err;
2223

    
2224
        /* calculate name of snapshot */
2225
        struct map tmp_map = *map;
2226
        unsigned char sha[SHA256_DIGEST_SIZE];
2227
        unsigned char *buf = malloc(block_size);
2228
        char newvolumename[MAX_VOLUME_LEN];
2229
        uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
2230
        uint64_t pos = 0;
2231
        uint64_t max_objidx = calc_map_obj(map);
2232
        int r;
2233

    
2234
        for (i = 0; i < max_objidx; i++) {
2235
                mn = find_object(map, i);
2236
                if (!mn){
2237
                        XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
2238
                                        (unsigned long long) i, map->volume);
2239
                        goto out_err;
2240
                }
2241
                v0_object_to_map(mn, buf+pos);
2242
                pos += v0_objectsize_in_map;
2243
        }
2244
//        SHA256(buf, pos, sha);
2245
        merkle_hash(buf, pos, sha);
2246
        hexlify(sha, newvolumename);
2247
        strncpy(tmp_map.volume, newvolumename, newvolumenamelen);
2248
        tmp_map.volumelen = newvolumenamelen;
2249
        free(buf);
2250
        tmp_map.version = 0; // set volume version to pithos image
2251

    
2252
        /* write the map of the Snapshot */
2253
        r = write_map(pr, &tmp_map);
2254
        if (r < 0)
2255
                goto out_err;
2256
        char targetbuf[XSEG_MAX_TARGETLEN];
2257
        char *target = xseg_get_target(peer->xseg, pr->req);
2258
        strncpy(targetbuf, target, pr->req->targetlen);
2259
        r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
2260
                        sizeof(struct xseg_reply_snapshot));
2261
        if (r < 0){
2262
                XSEGLOG2(&lc, E, "Cannot resize request");
2263
                goto out_err;
2264
        }
2265
        target = xseg_get_target(peer->xseg, pr->req);
2266
        strncpy(target, targetbuf, pr->req->targetlen);
2267

    
2268
        struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *)
2269
                                                xseg_get_data(peer->xseg, pr->req);
2270
        strncpy(xreply->target, newvolumename, newvolumenamelen);
2271
        xreply->targetlen = newvolumenamelen;
2272
        map->flags &= ~MF_MAP_SNAPSHOTTING;
2273
        XSEGLOG2(&lc, I, "Snapshot for map %s completed", map->volume);
2274
        return 0;
2275

    
2276
out_err:
2277
        map->flags &= ~MF_MAP_SNAPSHOTTING;
2278
        XSEGLOG2(&lc, E, "Snapshot for map %s failed", map->volume);
2279
        return -1;
2280
}
2281

    
2282

    
2283
static int do_destroy(struct peer_req *pr, struct map *map)
2284
{
2285
        uint64_t i;
2286
        struct peerd *peer = pr->peer;
2287
        struct mapper_io *mio = __get_mapper_io(pr);
2288
        struct map_node *mn;
2289
        struct xseg_request *req;
2290

    
2291
        if (!(map->flags & MF_MAP_EXCLUSIVE))
2292
                return -1;
2293

    
2294
        XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
2295
        req = __delete_map(pr, map);
2296
        if (!req)
2297
                return -1;
2298
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
2299
        if (req->state & XS_FAILED){
2300
                xseg_put_request(peer->xseg, req, pr->portno);
2301
                map->flags &= ~MF_MAP_DELETING;
2302
                return -1;
2303
        }
2304
        xseg_put_request(peer->xseg, req, pr->portno);
2305

    
2306
        uint64_t nr_obj = calc_map_obj(map);
2307
        mio->cb = deletion_cb;
2308
        mio->del_pending = 0;
2309
        mio->err = 0;
2310
        for (i = 0; i < nr_obj; i++){
2311

    
2312
                /* throttle pending deletions
2313
                 * this should be nr_ops of the blocker, but since we don't know
2314
                 * that, we assume based on our own nr_ops
2315
                 */
2316
                if (mio->del_pending >= peer->nr_ops)
2317
                        wait_on_pr(pr, mio->del_pending >= peer->nr_ops);
2318

    
2319
                mn = get_mapnode(map, i);
2320
                if (!mn)
2321
                        continue;
2322
                if (mn->flags & MF_OBJECT_DESTROYED){
2323
                        put_mapnode(mn);
2324
                        continue;
2325
                }
2326
                if (!(mn->flags & MF_OBJECT_EXIST)){
2327
                        mn->flags |= MF_OBJECT_DESTROYED;
2328
                        put_mapnode(mn);
2329
                        continue;
2330
                }
2331

    
2332
                // make sure all pending operations on all objects are completed
2333
                if (mn->flags & MF_OBJECT_NOT_READY)
2334
                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2335

    
2336
                req = __delete_object(pr, mn);
2337
                if (!req){
2338
                        mio->err = 1;
2339
                        put_mapnode(mn);
2340
                        continue;
2341
                }
2342
                mio->del_pending++;
2343
                /* do not put_mapnode here. cb does that */
2344
        }
2345

    
2346
        if (mio->del_pending > 0)
2347
                wait_on_pr(pr, mio->del_pending > 0);
2348

    
2349
        mio->cb = NULL;
2350
        map->flags &= ~MF_MAP_DELETING;
2351
        map->flags |= MF_MAP_DELETED;
2352
        XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
2353
        return do_close(pr, map);
2354
}
2355

    
2356
static int do_mapr(struct peer_req *pr, struct map *map)
2357
{
2358
        struct peerd *peer = pr->peer;
2359
        int r = req2objs(pr, map, 0);
2360
        if  (r < 0){
2361
                XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
2362
                                map->volume, 
2363
                                (unsigned long long) pr->req->offset, 
2364
                                (unsigned long long) (pr->req->offset + pr->req->size));
2365
                return -1;
2366
        }
2367
        XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
2368
                        map->volume, 
2369
                        (unsigned long long) pr->req->offset, 
2370
                        (unsigned long long) (pr->req->offset + pr->req->size));
2371
        XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2372
                        (unsigned long long) pr->req->offset,
2373
                        (unsigned long long) pr->req->size);
2374
        char buf[XSEG_MAX_TARGETLEN+1];
2375
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2376
        int i;
2377
        for (i = 0; i < reply->cnt; i++) {
2378
                XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2379
                strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2380
                buf[reply->segs[i].targetlen] = 0;
2381
                XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2382
                                (unsigned long long) reply->segs[i].offset,
2383
                                (unsigned long long) reply->segs[i].size);
2384
        }
2385
        return 0;
2386
}
2387

    
2388
static int do_mapw(struct peer_req *pr, struct map *map)
2389
{
2390
        struct peerd *peer = pr->peer;
2391
        int r = req2objs(pr, map, 1);
2392
        if  (r < 0){
2393
                XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
2394
                                map->volume, 
2395
                                (unsigned long long) pr->req->offset, 
2396
                                (unsigned long long) (pr->req->offset + pr->req->size));
2397
                return -1;
2398
        }
2399
        XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
2400
                        map->volume, 
2401
                        (unsigned long long) pr->req->offset, 
2402
                        (unsigned long long) (pr->req->offset + pr->req->size));
2403
        XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2404
                        (unsigned long long) pr->req->offset,
2405
                        (unsigned long long) pr->req->size);
2406
        char buf[XSEG_MAX_TARGETLEN+1];
2407
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2408
        int i;
2409
        for (i = 0; i < reply->cnt; i++) {
2410
                XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2411
                strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2412
                buf[reply->segs[i].targetlen] = 0;
2413
                XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2414
                                (unsigned long long) reply->segs[i].offset,
2415
                                (unsigned long long) reply->segs[i].size);
2416
        }
2417
        return 0;
2418
}
2419

    
2420
//here map is the parent map
2421
static int do_clone(struct peer_req *pr, struct map *map)
2422
{
2423
        int r;
2424
        struct peerd *peer = pr->peer;
2425
        struct mapperd *mapper = __get_mapperd(peer);
2426
        char *target = xseg_get_target(peer->xseg, pr->req);
2427
        struct map *clonemap;
2428
        struct xseg_request_clone *xclone =
2429
                (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2430

    
2431
        XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
2432

    
2433
        clonemap = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2434
        if (!clonemap)
2435
                return -1;
2436

    
2437
        /* open map to get exclusive access to map */
2438
        r = open_map(pr, clonemap, 0);
2439
        if (r < 0){
2440
                XSEGLOG2(&lc, E, "Cannot open map %s", clonemap->volume);
2441
                XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2442
                goto out_err;
2443
        }
2444
        r = load_map(pr, clonemap);
2445
        if (r >= 0) {
2446
                XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2447
                goto out_err;
2448
        }
2449

    
2450
        if (xclone->size == -1)
2451
                clonemap->size = map->size;
2452
        else
2453
                clonemap->size = xclone->size;
2454
        if (clonemap->size < map->size){
2455
                XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
2456
                                "\n\t for requested clone %s",
2457
                                (unsigned long long) xclone->size,
2458
                                (unsigned long long) map->size, clonemap->volume);
2459
                goto out_err;
2460
        }
2461
        if (clonemap->size > MAX_VOLUME_SIZE) {
2462
                XSEGLOG2(&lc, E, "Requested size %llu > max volume size %llu"
2463
                                "\n\t for volume %s",
2464
                                clonemap->size, MAX_VOLUME_SIZE, clonemap->volume);
2465
                goto out_err;
2466
        }
2467

    
2468
        //alloc and init map_nodes
2469
        //unsigned long c = clonemap->size/block_size + 1;
2470
        unsigned long c = calc_map_obj(clonemap);
2471
        struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
2472
        if (!map_nodes){
2473
                goto out_err;
2474
        }
2475
        int i;
2476
        //for (i = 0; i < clonemap->size/block_size + 1; i++) {
2477
        for (i = 0; i < c; i++) {
2478
                struct map_node *mn = get_mapnode(map, i);
2479
                if (mn) {
2480
                        strncpy(map_nodes[i].object, mn->object, mn->objectlen);
2481
                        map_nodes[i].objectlen = mn->objectlen;
2482
                        put_mapnode(mn);
2483
                } else {
2484
                        strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2485
                        map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2486
                }
2487
                map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2488
                map_nodes[i].flags = 0;
2489
                map_nodes[i].objectidx = i;
2490
                map_nodes[i].map = clonemap;
2491
                map_nodes[i].ref = 1;
2492
                map_nodes[i].waiters = 0;
2493
                map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2494
                r = insert_object(clonemap, &map_nodes[i]);
2495
                if (r < 0){
2496
                        XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
2497
                        goto out_err;
2498
                }
2499
        }
2500

    
2501
        r = write_map(pr, clonemap);
2502
        if (r < 0){
2503
                XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
2504
                goto out_err;
2505
        }
2506
        do_close(pr, clonemap);
2507
        return 0;
2508

    
2509
out_err:
2510
        do_close(pr, clonemap);
2511
        return -1;
2512
}
2513

    
2514
static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
2515
{
2516
        int r, opened = 0;
2517
        if (flags & MF_EXCLUSIVE){
2518
                r = open_map(pr, map, flags);
2519
                if (r < 0) {
2520
                        if (flags & MF_FORCE){
2521
                                return -1;
2522
                        }
2523
                } else {
2524
                        opened = 1;
2525
                }
2526
        }
2527
        r = load_map(pr, map);
2528
        if (r < 0 && opened){
2529
                close_map(pr, map);
2530
        }
2531
        return r;
2532
}
2533

    
2534
struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
2535
                        uint32_t flags)
2536
{
2537
        int r;
2538
        struct peerd *peer = pr->peer;
2539
        struct mapperd *mapper = __get_mapperd(peer);
2540
        struct map *map = find_map_len(mapper, name, namelen, flags);
2541
        if (!map){
2542
                if (flags & MF_LOAD){
2543
                        map = create_map(mapper, name, namelen, flags);
2544
                        if (!map)
2545
                                return NULL;
2546
                        r = open_load_map(pr, map, flags);
2547
                        if (r < 0){
2548
                                do_dropcache(pr, map);
2549
                                return NULL;
2550
                        }
2551
                } else {
2552
                        return NULL;
2553
                }
2554
        } else if (map->flags & MF_MAP_DESTROYED){
2555
                return NULL;
2556
        }
2557
        __get_map(map);
2558
        return map;
2559

    
2560
}
2561

    
2562
static int map_action(int (action)(struct peer_req *pr, struct map *map),
2563
                struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
2564
{
2565
        //struct peerd *peer = pr->peer;
2566
        struct map *map;
2567
start:
2568
        map = get_map(pr, name, namelen, flags);
2569
        if (!map)
2570
                return -1;
2571
        if (map->flags & MF_MAP_NOT_READY){
2572
                wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
2573
                put_map(map);
2574
                goto start;
2575
        }
2576
        int r = action(pr, map);
2577
        //always drop cache if map not read exclusively
2578
        if (!(map->flags & MF_MAP_EXCLUSIVE))
2579
                do_dropcache(pr, map);
2580
        signal_map(map);
2581
        put_map(map);
2582
        return r;
2583
}
2584

    
2585
void * handle_info(struct peer_req *pr)
2586
{
2587
        struct peerd *peer = pr->peer;
2588
        char *target = xseg_get_target(peer->xseg, pr->req);
2589
        int r = map_action(do_info, pr, target, pr->req->targetlen,
2590
                                MF_ARCHIP|MF_LOAD);
2591
        if (r < 0)
2592
                fail(peer, pr);
2593
        else
2594
                complete(peer, pr);
2595
        ta--;
2596
        return NULL;
2597
}
2598

    
2599
void * handle_clone(struct peer_req *pr)
2600
{
2601
        int r;
2602
        struct peerd *peer = pr->peer;
2603
        struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2604
        if (!xclone) {
2605
                r = -1;
2606
                goto out;
2607
        }
2608

    
2609
        if (xclone->targetlen){
2610
                /* if snap was defined */
2611
                //support clone only from pithos
2612
                r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
2613
                                        MF_LOAD);
2614
        } else {
2615
                /* else try to create a new volume */
2616
                XSEGLOG2(&lc, I, "Creating volume");
2617
                if (!xclone->size){
2618
                        XSEGLOG2(&lc, E, "Cannot create volume. Size not specified");
2619
                        r = -1;
2620
                        goto out;
2621
                }
2622
                if (xclone->size > MAX_VOLUME_SIZE) {
2623
                        XSEGLOG2(&lc, E, "Requested size %llu > max volume "
2624
                                        "size %llu", xclone->size, MAX_VOLUME_SIZE);
2625
                        r = -1;
2626
                        goto out;
2627
                }
2628

    
2629
                struct map *map;
2630
                char *target = xseg_get_target(peer->xseg, pr->req);
2631

    
2632
                //create a new empty map of size
2633
                map = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2634
                if (!map){
2635
                        r = -1;
2636
                        goto out;
2637
                }
2638
                /* open map to get exclusive access to map */
2639
                r = open_map(pr, map, 0);
2640
                if (r < 0){
2641
                        XSEGLOG2(&lc, E, "Cannot open map %s", map->volume);
2642
                        XSEGLOG2(&lc, E, "Target volume %s exists", map->volume);
2643
                        do_dropcache(pr, map);
2644
                        r = -1;
2645
                        goto out;
2646
                }
2647
                r = load_map(pr, map);
2648
                if (r >= 0) {
2649
                        XSEGLOG2(&lc, E, "Map exists %s", map->volume);
2650
                        do_close(pr, map);
2651
                        r = -1;
2652
                        goto out;
2653
                }
2654
                map->size = xclone->size;
2655
                //populate_map with zero objects;
2656
                uint64_t nr_objs = xclone->size / block_size;
2657
                if (xclone->size % block_size)
2658
                        nr_objs++;
2659

    
2660
                struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
2661
                if (!map_nodes){
2662
                        do_close(pr, map);
2663
                        r = -1;
2664
                        goto out;
2665
                }
2666

    
2667
                uint64_t i;
2668
                for (i = 0; i < nr_objs; i++) {
2669
                        strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2670
                        map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2671
                        map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2672
                        map_nodes[i].flags = 0;
2673
                        map_nodes[i].objectidx = i;
2674
                        map_nodes[i].map = map;
2675
                        map_nodes[i].ref = 1;
2676
                        map_nodes[i].waiters = 0;
2677
                        map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2678
                        r = insert_object(map, &map_nodes[i]);
2679
                        if (r < 0){
2680
                                do_close(pr, map);
2681
                                r = -1;
2682
                                goto out;
2683
                        }
2684
                }
2685
                r = write_map(pr, map);
2686
                if (r < 0){
2687
                        XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
2688
                        do_close(pr, map);
2689
                        goto out;
2690
                }
2691
                XSEGLOG2(&lc, I, "Volume %s created", map->volume);
2692
                r = 0;
2693
                do_close(pr, map); //drop cache here for consistency
2694
        }
2695
out:
2696
        if (r < 0)
2697
                fail(peer, pr);
2698
        else
2699
                complete(peer, pr);
2700
        ta--;
2701
        return NULL;
2702
}
2703

    
2704
void * handle_mapr(struct peer_req *pr)
2705
{
2706
        struct peerd *peer = pr->peer;
2707
        char *target = xseg_get_target(peer->xseg, pr->req);
2708
        int r = map_action(do_mapr, pr, target, pr->req->targetlen,
2709
                                MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2710
        if (r < 0)
2711
                fail(peer, pr);
2712
        else
2713
                complete(peer, pr);
2714
        ta--;
2715
        return NULL;
2716
}
2717

    
2718
void * handle_mapw(struct peer_req *pr)
2719
{
2720
        struct peerd *peer = pr->peer;
2721
        char *target = xseg_get_target(peer->xseg, pr->req);
2722
        int r = map_action(do_mapw, pr, target, pr->req->targetlen,
2723
                                MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2724
        if (r < 0)
2725
                fail(peer, pr);
2726
        else
2727
                complete(peer, pr);
2728
        XSEGLOG2(&lc, D, "Ta: %d", ta);
2729
        ta--;
2730
        return NULL;
2731
}
2732

    
2733
void * handle_destroy(struct peer_req *pr)
2734
{
2735
        struct peerd *peer = pr->peer;
2736
        char *target = xseg_get_target(peer->xseg, pr->req);
2737
        /* request EXCLUSIVE access, but do not force it.
2738
         * check if succeeded on do_destroy
2739
         */
2740
        int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2741
                                MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2742
        if (r < 0)
2743
                fail(peer, pr);
2744
        else
2745
                complete(peer, pr);
2746
        ta--;
2747
        return NULL;
2748
}
2749

    
2750
void * handle_open(struct peer_req *pr)
2751
{
2752
        struct peerd *peer = pr->peer;
2753
        char *target = xseg_get_target(peer->xseg, pr->req);
2754
        //here we do not want to load
2755
        int r = map_action(do_open, pr, target, pr->req->targetlen,
2756
                                MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2757
        if (r < 0)
2758
                fail(peer, pr);
2759
        else
2760
                complete(peer, pr);
2761
        ta--;
2762
        return NULL;
2763
}
2764

    
2765
void * handle_close(struct peer_req *pr)
2766
{
2767
        struct peerd *peer = pr->peer;
2768
        char *target = xseg_get_target(peer->xseg, pr->req);
2769
        //here we do not want to load
2770
        int r = map_action(do_close, pr, target, pr->req->targetlen,
2771
                                MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2772
        if (r < 0)
2773
                fail(peer, pr);
2774
        else
2775
                complete(peer, pr);
2776
        ta--;
2777
        return NULL;
2778
}
2779

    
2780
void * handle_snapshot(struct peer_req *pr)
2781
{
2782
        struct peerd *peer = pr->peer;
2783
        char *target = xseg_get_target(peer->xseg, pr->req);
2784
        /* request EXCLUSIVE access, but do not force it.
2785
         * check if succeeded on do_snapshot
2786
         */
2787
        int r = map_action(do_snapshot, pr, target, pr->req->targetlen,
2788
                                MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2789
        if (r < 0)
2790
                fail(peer, pr);
2791
        else
2792
                complete(peer, pr);
2793
        ta--;
2794
        return NULL;
2795
}
2796

    
2797
int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
2798
                        struct xseg_request *req)
2799
{
2800
        //struct mapperd *mapper = __get_mapperd(peer);
2801
        struct mapper_io *mio = __get_mapper_io(pr);
2802
        void *(*action)(struct peer_req *) = NULL;
2803

    
2804
        mio->state = ACCEPTED;
2805
        mio->err = 0;
2806
        mio->cb = NULL;
2807
        switch (pr->req->op) {
2808
                /* primary xseg operations of mapper */
2809
                case X_CLONE: action = handle_clone; break;
2810
                case X_MAPR: action = handle_mapr; break;
2811
                case X_MAPW: action = handle_mapw; break;
2812
                case X_SNAPSHOT: action = handle_snapshot; break;
2813
                case X_INFO: action = handle_info; break;
2814
                case X_DELETE: action = handle_destroy; break;
2815
                case X_OPEN: action = handle_open; break;
2816
                case X_CLOSE: action = handle_close; break;
2817
                default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2818
        }
2819
        if (action){
2820
                ta++;
2821
                mio->active = 1;
2822
                st_thread_create(action, pr, 0, 0);
2823
        }
2824
        return 0;
2825

    
2826
}
2827

    
2828
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2829
                enum dispatch_reason reason)
2830
{
2831
        struct mapperd *mapper = __get_mapperd(peer);
2832
        (void) mapper;
2833
        struct mapper_io *mio = __get_mapper_io(pr);
2834
        (void) mio;
2835

    
2836

    
2837
        if (reason == dispatch_accept)
2838
                dispatch_accepted(peer, pr, req);
2839
        else {
2840
                if (mio->cb){
2841
                        mio->cb(pr, req);
2842
                } else { 
2843
                        signal_pr(pr);
2844
                }
2845
        }
2846
        return 0;
2847
}
2848

    
2849
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2850
{
2851
        int i;
2852

    
2853
        //FIXME error checks
2854
        struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2855
        peer->priv = mapperd;
2856
        mapper = mapperd;
2857
        mapper->hashmaps = xhash_new(3, STRING);
2858

    
2859
        for (i = 0; i < peer->nr_ops; i++) {
2860
                struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2861
                mio->copyups_nodes = xhash_new(3, INTEGER);
2862
                mio->copyups = 0;
2863
                mio->err = 0;
2864
                mio->active = 0;
2865
                peer->peer_reqs[i].priv = mio;
2866
        }
2867

    
2868
        mapper->bportno = -1;
2869
        mapper->mbportno = -1;
2870
        BEGIN_READ_ARGS(argc, argv);
2871
        READ_ARG_ULONG("-bp", mapper->bportno);
2872
        READ_ARG_ULONG("-mbp", mapper->mbportno);
2873
        END_READ_ARGS();
2874
        if (mapper->bportno == -1){
2875
                XSEGLOG2(&lc, E, "Portno for blocker must be provided");
2876
                usage(argv[0]);
2877
                return -1;
2878
        }
2879
        if (mapper->mbportno == -1){
2880
                XSEGLOG2(&lc, E, "Portno for mblocker must be provided");
2881
                usage(argv[0]);
2882
                return -1;
2883
        }
2884

    
2885
        const struct sched_param param = { .sched_priority = 99 };
2886
        sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
2887
        /* FIXME maybe place it in peer
2888
         * should be done for each port (sportno to eportno)
2889
         */
2890
        xseg_set_max_requests(peer->xseg, peer->portno_start, 5000);
2891
        xseg_set_freequeue_size(peer->xseg, peer->portno_start, 3000, 0);
2892

    
2893

    
2894
//        test_map(peer);
2895

    
2896
        return 0;
2897
}
2898

    
2899
/* FIXME this should not be here */
2900
int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
2901
{
2902
        struct xseg *xseg = peer->xseg;
2903
        xport portno_start = peer->portno_start;
2904
        xport portno_end = peer->portno_end;
2905
        struct peer_req *pr;
2906
        xport i;
2907
        int  r, c = 0;
2908
        struct xseg_request *received;
2909
        xseg_prepare_wait(xseg, portno_start);
2910
        while(1) {
2911
                XSEGLOG2(&lc, D, "Attempting to check for reply");
2912
                c = 1;
2913
                while (c){
2914
                        c = 0;
2915
                        for (i = portno_start; i <= portno_end; i++) {
2916
                                received = xseg_receive(xseg, i, 0);
2917
                                if (received) {
2918
                                        c = 1;
2919
                                        r =  xseg_get_req_data(xseg, received, (void **) &pr);
2920
                                        if (r < 0 || !pr || received != expected_req){
2921
                                                XSEGLOG2(&lc, W, "Received request with no pr data\n");
2922
                                                xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
2923
                                                if (p == NoPort){
2924
                                                        XSEGLOG2(&lc, W, "Could not respond stale request");
2925
                                                        xseg_put_request(xseg, received, portno_start);
2926
                                                        continue;
2927
                                                } else {
2928
                                                        xseg_signal(xseg, p);
2929
                                                }
2930
                                        } else {
2931
                                                xseg_cancel_wait(xseg, portno_start);
2932
                                                return 0;
2933
                                        }
2934
                                }
2935
                        }
2936
                }
2937
                xseg_wait_signal(xseg, 1000000UL);
2938
        }
2939
}
2940

    
2941

    
2942
void custom_peer_finalize(struct peerd *peer)
2943
{
2944
        struct mapperd *mapper = __get_mapperd(peer);
2945
        struct peer_req *pr = alloc_peer_req(peer);
2946
        if (!pr){
2947
                XSEGLOG2(&lc, E, "Cannot get peer request");
2948
                return;
2949
        }
2950
        struct map *map;
2951
        struct xseg_request *req;
2952
        xhash_iter_t it;
2953
        xhashidx key, val;
2954
        xhash_iter_init(mapper->hashmaps, &it);
2955
        while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
2956
                map = (struct map *)val;
2957
                if (!(map->flags & MF_MAP_EXCLUSIVE))
2958
                        continue;
2959
                req = __close_map(pr, map);
2960
                if (!req)
2961
                        continue;
2962
                wait_reply(peer, req);
2963
                if (!(req->state & XS_SERVED))
2964
                        XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
2965
                map->flags &= ~MF_MAP_CLOSING;
2966
                xseg_put_request(peer->xseg, req, pr->portno);
2967
        }
2968
        return;
2969

    
2970

    
2971
}
2972

    
2973
void print_obj(struct map_node *mn)
2974
{
2975
        fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2976
                        (unsigned long long) mn->objectidx, mn->object, 
2977
                        (unsigned int) mn->objectlen, 
2978
                        (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2979
}
2980

    
2981
void print_map(struct map *m)
2982
{
2983
        uint64_t nr_objs = m->size/block_size;
2984
        if (m->size % block_size)
2985
                nr_objs++;
2986
        fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n", 
2987
                        m->volume, m->volumelen, 
2988
                        (unsigned long long) m->size, 
2989
                        (unsigned long long) nr_objs,
2990
                        m->version);
2991
        uint64_t i;
2992
        struct map_node *mn;
2993
        if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2994
                return;
2995
        for (i = 0; i < nr_objs; i++) {
2996
                mn = find_object(m, i);
2997
                if (!mn){
2998
                        printf("object idx [%llu] not found!\n", (unsigned long long) i);
2999
                        continue;
3000
                }
3001
                print_obj(mn);
3002
        }
3003
}
3004

    
3005
/*
3006
void test_map(struct peerd *peer)
3007
{
3008
        int i,j, ret;
3009
        //struct sha256_ctx sha256ctx;
3010
        unsigned char buf[SHA256_DIGEST_SIZE];
3011
        char buf_new[XSEG_MAX_TARGETLEN + 20];
3012
        struct map *m = malloc(sizeof(struct map));
3013
        strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
3014
        m->volume[XSEG_MAX_TARGETLEN] = 0;
3015
        strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
3016
        buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
3017
        m->volumelen = XSEG_MAX_TARGETLEN;
3018
        m->size = 100*block_size;
3019
        m->objects = xhash_new(3, INTEGER);
3020
        struct map_node *map_node = calloc(100, sizeof(struct map_node));
3021
        for (i = 0; i < 100; i++) {
3022
                sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
3023
                gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
3024
                
3025
                for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
3026
                        sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
3027
                }
3028
                map_node[i].objectidx = i;
3029
                map_node[i].objectlen = XSEG_MAX_TARGETLEN;
3030
                map_node[i].flags = MF_OBJECT_EXIST;
3031
                ret = insert_object(m, &map_node[i]);
3032
        }
3033

3034
        char *data = malloc(block_size);
3035
        mapheader_to_map(m, data);
3036
        uint64_t pos = mapheader_size;
3037

3038
        for (i = 0; i < 100; i++) {
3039
                map_node = find_object(m, i);
3040
                if (!map_node){
3041
                        printf("no object node %d \n", i);
3042
                        exit(1);
3043
                }
3044
                object_to_map(data+pos, map_node);
3045
                pos += objectsize_in_map;
3046
        }
3047
//        print_map(m);
3048

3049
        struct map *m2 = malloc(sizeof(struct map));
3050
        strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
3051
        m->volume[XSEG_MAX_TARGETLEN] = 0;
3052
        m->volumelen = XSEG_MAX_TARGETLEN;
3053

3054
        m2->objects = xhash_new(3, INTEGER);
3055
        ret = read_map(peer, m2, data);
3056
//        print_map(m2);
3057

3058
        int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
3059
        ssize_t r, sum = 0;
3060
        while (sum < block_size) {
3061
                r = write(fd, data + sum, block_size -sum);
3062
                if (r < 0){
3063
                        perror("write");
3064
                        printf("write error\n");
3065
                        exit(1);
3066
                } 
3067
                sum += r;
3068
        }
3069
        close(fd);
3070
        map_node = find_object(m, 0);
3071
        free(map_node);
3072
        free(m);
3073
}
3074
*/