Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / mt-mapperd.c @ d8a852fa

History | View | Annotate | Download (80 kB)

1
/*
2
 * Copyright 2012 GRNET S.A. All rights reserved.
3
 *
4
 * Redistribution and use in source and binary forms, with or
5
 * without modification, are permitted provided that the following
6
 * conditions are met:
7
 *
8
 *   1. Redistributions of source code must retain the above
9
 *      copyright notice, this list of conditions and the following
10
 *      disclaimer.
11
 *   2. Redistributions in binary form must reproduce the above
12
 *      copyright notice, this list of conditions and the following
13
 *      disclaimer in the documentation and/or other materials
14
 *      provided with the distribution.
15
 *
16
 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
 * POSSIBILITY OF SUCH DAMAGE.
28
 *
29
 * The views and conclusions contained in the software and
30
 * documentation are those of the authors and should not be
31
 * interpreted as representing official policies, either expressed
32
 * or implied, of GRNET S.A.
33
 */
34

    
35
#include <stdio.h>
36
#include <unistd.h>
37
#include <sys/types.h>
38
#include <pthread.h>
39
#include <xseg/xseg.h>
40
#include <peer.h>
41
#include <time.h>
42
#include <xtypes/xlock.h>
43
#include <xtypes/xhash.h>
44
#include <xseg/protocol.h>
45
#include <sys/stat.h>
46
#include <fcntl.h>
47
#include <errno.h>
48
#include <sched.h>
49
#include <sys/syscall.h>
50
#include <openssl/sha.h>
51
#include <ctype.h>
52

    
53
/* general mapper flags */
54
#define MF_LOAD         (1 << 0)
55
#define MF_EXCLUSIVE         (1 << 1)
56
#define MF_FORCE         (1 << 2)
57
#define MF_ARCHIP        (1 << 3)
58

    
59
#ifndef SHA256_DIGEST_SIZE
60
#define SHA256_DIGEST_SIZE 32
61
#endif
62
/* hex representation of sha256 value takes up double the sha256 size */
63
#define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
64

    
65
#define block_size (1<<22) //FIXME this should be defined here?
66

    
67
/* transparency byte + max object len in disk */
68
#define objectsize_in_map (1 + SHA256_DIGEST_SIZE)
69

    
70
/* Map header contains:
71
 *         map version
72
 *         volume size
73
 */
74
#define mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
75

    
76

    
77
#define MAPPER_PREFIX "archip_"
78
#define MAPPER_PREFIX_LEN 7
79

    
80
#define MAX_REAL_VOLUME_LEN (XSEG_MAX_TARGETLEN - MAPPER_PREFIX_LEN)
81
#define MAX_VOLUME_LEN (MAPPER_PREFIX_LEN + MAX_REAL_VOLUME_LEN)
82

    
83
#if MAX_VOLUME_LEN > XSEG_MAX_TARGETLEN
84
#error         "XSEG_MAX_TARGETLEN should be at least MAX_VOLUME_LEN"
85
#endif
86

    
87
#define MAX_OBJECT_LEN (MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE)
88

    
89
#if MAX_OBJECT_LEN > XSEG_MAX_TARGETLEN
90
#error         "XSEG_MAX_TARGETLEN should be at least MAX_OBJECT_LEN"
91
#endif
92

    
93
#define MAX_VOLUME_SIZE \
94
((uint64_t) (((block_size-mapheader_size)/objectsize_in_map)* block_size))
95

    
96

    
97
//char *zero_block="0000000000000000000000000000000000000000000000000000000000000000";
98

    
99
/* pithos considers this a block full of zeros, so should we.
100
 * it is actually the sha256 hash of nothing.
101
 */
102
char *zero_block="e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
103
#define ZERO_BLOCK_LEN (64) /* strlen(zero_block) */
104

    
105
/* dispatch_internal mapper states */
106
enum mapper_state {
107
        ACCEPTED = 0,
108
        WRITING = 1,
109
        COPYING = 2,
110
        DELETING = 3,
111
        DROPPING_CACHE = 4
112
};
113

    
114
typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
115

    
116

    
117
/* mapper object flags */
118
#define MF_OBJECT_EXIST                (1 << 0)
119
#define MF_OBJECT_COPYING        (1 << 1)
120
#define MF_OBJECT_WRITING        (1 << 2)
121
#define MF_OBJECT_DELETING        (1 << 3)
122
#define MF_OBJECT_DESTROYED        (1 << 5)
123
#define MF_OBJECT_SNAPSHOTTING        (1 << 6)
124

    
125
#define MF_OBJECT_NOT_READY        (MF_OBJECT_COPYING|MF_OBJECT_WRITING|\
126
                                MF_OBJECT_DELETING|MF_OBJECT_SNAPSHOTTING)
127
struct map_node {
128
        uint32_t flags;
129
        uint32_t objectidx;
130
        uint32_t objectlen;
131
        char object[MAX_OBJECT_LEN + 1];         /* NULL terminated string */
132
        struct map *map;
133
        uint32_t ref;
134
        uint32_t waiters;
135
        st_cond_t cond;
136
};
137

    
138

    
139
#define wait_on_pr(__pr, __condition__)         \
140
        while (__condition__){                        \
141
                ta--;                                \
142
                __get_mapper_io(pr)->active = 0;\
143
                XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
144
                st_cond_wait(__pr->cond);        \
145
        }
146

    
147
#define wait_on_mapnode(__mn, __condition__)        \
148
        while (__condition__){                        \
149
                ta--;                                \
150
                __mn->waiters++;                \
151
                XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, \
152
                        ta: %u",  __mn, __mn->object, __mn->waiters, ta);  \
153
                st_cond_wait(__mn->cond);        \
154
        }
155

    
156
#define wait_on_map(__map, __condition__)        \
157
        while (__condition__){                        \
158
                ta--;                                \
159
                __map->waiters++;                \
160
                XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",\
161
                                   __map, __map->volume, __map->waiters, ta); \
162
                st_cond_wait(__map->cond);        \
163
        }
164

    
165
#define signal_pr(__pr)                                \
166
        do {                                         \
167
                if (!__get_mapper_io(pr)->active){\
168
                        ta++;                        \
169
                        XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta);\
170
                        __get_mapper_io(pr)->active = 1;\
171
                        st_cond_signal(__pr->cond);        \
172
                }                                \
173
        }while(0)
174

    
175
#define signal_map(__map)                        \
176
        do {                                         \
177
                if (__map->waiters) {                \
178
                        ta += 1;                \
179
                        XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, \
180
                        ta: %u",  __map, __map->volume, __map->waiters, ta); \
181
                        __map->waiters--;        \
182
                        st_cond_signal(__map->cond);        \
183
                }                                \
184
        }while(0)
185

    
186
#define signal_mapnode(__mn)                        \
187
        do {                                         \
188
                if (__mn->waiters) {                \
189
                        ta += __mn->waiters;        \
190
                        XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: \
191
                        %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
192
                        __mn->waiters = 0;        \
193
                        st_cond_broadcast(__mn->cond);        \
194
                }                                \
195
        }while(0)
196

    
197

    
198
/* map flags */
199
#define MF_MAP_LOADING                (1 << 0)
200
#define MF_MAP_DESTROYED        (1 << 1)
201
#define MF_MAP_WRITING                (1 << 2)
202
#define MF_MAP_DELETING                (1 << 3)
203
#define MF_MAP_DROPPING_CACHE        (1 << 4)
204
#define MF_MAP_EXCLUSIVE        (1 << 5)
205
#define MF_MAP_OPENING                (1 << 6)
206
#define MF_MAP_CLOSING                (1 << 7)
207
#define MF_MAP_DELETED                (1 << 8)
208
#define MF_MAP_SNAPSHOTTING        (1 << 9)
209

    
210
#define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
211
                                MF_MAP_DROPPING_CACHE|MF_MAP_OPENING|               \
212
                                MF_MAP_SNAPSHOTTING)
213

    
214
struct map {
215
        uint32_t version;
216
        uint32_t flags;
217
        uint64_t size;
218
        uint32_t volumelen;
219
        char volume[MAX_VOLUME_LEN + 1]; /* NULL terminated string */
220
        xhash_t *objects;         /* obj_index --> map_node */
221
        uint32_t ref;
222
        uint32_t waiters;
223
        st_cond_t cond;
224
};
225

    
226
struct mapperd {
227
        xport bportno;                /* blocker that accesses data */
228
        xport mbportno;                /* blocker that accesses maps */
229
        xhash_t *hashmaps; // hash_function(target) --> struct map
230
};
231

    
232
struct mapper_io {
233
        volatile uint32_t copyups;        /* nr of copyups pending, issued by this mapper io */
234
        xhash_t *copyups_nodes;                /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
235
        struct map_node *copyup_node;
236
        volatile int err;                        /* error flag */
237
        volatile uint64_t del_pending;
238
        volatile uint64_t snap_pending;
239
        uint64_t delobj;
240
        uint64_t dcobj;
241
        cb_t cb;
242
        enum mapper_state state;
243
        volatile int active;
244
};
245

    
246
/* global vars */
247
struct mapperd *mapper;
248

    
249
void print_map(struct map *m);
250

    
251

    
252
void custom_peer_usage()
253
{
254
        fprintf(stderr, "Custom peer options: \n"
255
                        "-bp  : port for block blocker(!)\n"
256
                        "-mbp : port for map blocker\n"
257
                        "\n");
258
}
259

    
260

    
261
/*
262
 * Helper functions
263
 */
264

    
265
static inline struct mapperd * __get_mapperd(struct peerd *peer)
266
{
267
        return (struct mapperd *) peer->priv;
268
}
269

    
270
static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
271
{
272
        return (struct mapper_io *) pr->priv;
273
}
274

    
275
static inline uint64_t calc_map_obj(struct map *map)
276
{
277
        if (map->size == -1)
278
                return 0;
279
        uint64_t nr_objs = map->size / block_size;
280
        if (map->size % block_size)
281
                nr_objs++;
282
        return nr_objs;
283
}
284

    
285
static uint32_t calc_nr_obj(struct xseg_request *req)
286
{
287
        unsigned int r = 1;
288
        uint64_t rem_size = req->size;
289
        uint64_t obj_offset = req->offset & (block_size -1); //modulo
290
        uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
291
        rem_size -= obj_size;
292
        while (rem_size > 0) {
293
                obj_size = (rem_size > block_size) ? block_size : rem_size;
294
                rem_size -= obj_size;
295
                r++;
296
        }
297

    
298
        return r;
299
}
300

    
301
/* hexlify function.
302
 * Unsafe. Doesn't check if data length is odd!
303
 */
304

    
305
static void hexlify(unsigned char *data, char *hex)
306
{
307
        int i;
308
        for (i=0; i<SHA256_DIGEST_LENGTH; i++)
309
                sprintf(hex+2*i, "%02x", data[i]);
310
}
311

    
312
static void unhexlify(char *hex, unsigned char *data)
313
{
314
        int i;
315
        char c;
316
        for (i=0; i<SHA256_DIGEST_LENGTH; i++){
317
                data[i] = 0;
318
                c = hex[2*i];
319
                if (isxdigit(c)){
320
                        if (isdigit(c)){
321
                                c-= '0';
322
                        }
323
                        else {
324
                                c = tolower(c);
325
                                c = c-'a' + 10;
326
                        }
327
                }
328
                else {
329
                        c = 0;
330
                }
331
                data[i] |= (c << 4) & 0xF0;
332
                c = hex[2*i+1];
333
                if (isxdigit(c)){
334
                        if (isdigit(c)){
335
                                c-= '0';
336
                        }
337
                        else {
338
                                c = tolower(c);
339
                                c = c-'a' + 10;
340
                        }
341
                }
342
                else {
343
                        c = 0;
344
                }
345
                data[i] |= c & 0x0F;
346
        }
347
}
348

    
349
void merkle_hash(unsigned char *hashes, unsigned long len,
350
                unsigned char hash[SHA256_DIGEST_SIZE])
351
{
352
        uint32_t i, l, s = 2;
353
        uint32_t nr = len/SHA256_DIGEST_SIZE;
354
        unsigned char *buf;
355
        unsigned char tmp_hash[SHA256_DIGEST_SIZE];
356

    
357
        if (!nr){
358
                SHA256(hashes, 0, hash);
359
                return;
360
        }
361
        if (nr == 1){
362
                memcpy(hash, hashes, SHA256_DIGEST_SIZE);
363
                return;
364
        }
365
        while (s < nr)
366
                s = s << 1;
367
        buf = malloc(sizeof(unsigned char)* SHA256_DIGEST_SIZE * s);
368
        memcpy(buf, hashes, nr * SHA256_DIGEST_SIZE);
369
        memset(buf + nr * SHA256_DIGEST_SIZE, 0, (s - nr) * SHA256_DIGEST_SIZE);
370
        for (l = s; l > 1; l = l/2) {
371
                for (i = 0; i < l; i += 2) {
372
                        SHA256(buf + (i * SHA256_DIGEST_SIZE),
373
                                        2 * SHA256_DIGEST_SIZE, tmp_hash);
374
                        memcpy(buf + (i/2 * SHA256_DIGEST_SIZE),
375
                                        tmp_hash, SHA256_DIGEST_SIZE);
376
                }
377
        }
378
        memcpy(hash, buf, SHA256_DIGEST_SIZE);
379
}
380

    
381
/*
382
 * Maps handling functions
383
 */
384

    
385
static struct map * find_map(struct mapperd *mapper, char *volume)
386
{
387
        struct map *m = NULL;
388
        int r = xhash_lookup(mapper->hashmaps, (xhashidx) volume,
389
                                (xhashidx *) &m);
390
        if (r < 0)
391
                return NULL;
392
        return m;
393
}
394

    
395
static struct map * find_map_len(struct mapperd *mapper, char *target,
396
                                        uint32_t targetlen, uint32_t flags)
397
{
398
        char buf[XSEG_MAX_TARGETLEN+1];
399
        if (flags & MF_ARCHIP){
400
                strncpy(buf, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
401
                strncpy(buf + MAPPER_PREFIX_LEN, target, targetlen);
402
                buf[MAPPER_PREFIX_LEN + targetlen] = 0;
403
                targetlen += MAPPER_PREFIX_LEN;
404
        }
405
        else {
406
                strncpy(buf, target, targetlen);
407
                buf[targetlen] = 0;
408
        }
409

    
410
        if (targetlen > MAX_VOLUME_LEN){
411
                XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
412
                                        targetlen, MAX_VOLUME_LEN);
413
                return NULL;
414
        }
415

    
416
        XSEGLOG2(&lc, D, "looking up map %s, len %u",
417
                        buf, targetlen);
418
        return find_map(mapper, buf);
419
}
420

    
421

    
422
static int insert_map(struct mapperd *mapper, struct map *map)
423
{
424
        int r = -1;
425

    
426
        if (find_map(mapper, map->volume)){
427
                XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
428
                goto out;
429
        }
430

    
431
        XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
432
                        map->volume, strlen(map->volume), (unsigned long) map);
433
        r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
434
        while (r == -XHASH_ERESIZE) {
435
                xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
436
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, 0, NULL);
437
                if (!new_hashmap){
438
                        XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
439
                                        (unsigned long long) shift);
440
                        goto out;
441
                }
442
                mapper->hashmaps = new_hashmap;
443
                r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
444
        }
445
out:
446
        return r;
447
}
448

    
449
static int remove_map(struct mapperd *mapper, struct map *map)
450
{
451
        int r = -1;
452

    
453
        //assert no pending pr on map
454

    
455
        r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
456
        while (r == -XHASH_ERESIZE) {
457
                xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
458
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, 0, NULL);
459
                if (!new_hashmap){
460
                        XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
461
                                        (unsigned long long) shift);
462
                        goto out;
463
                }
464
                mapper->hashmaps = new_hashmap;
465
                r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
466
        }
467
out:
468
        return r;
469
}
470

    
471
static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
472
{
473
        int r;
474
        xport p;
475
        struct peerd *peer = pr->peer;
476
        struct xseg_request *req;
477
        struct mapperd *mapper = __get_mapperd(peer);
478
        void *dummy;
479

    
480
        XSEGLOG2(&lc, I, "Closing map %s", map->volume);
481

    
482
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
483
        if (!req){
484
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
485
                                map->volume);
486
                goto out_err;
487
        }
488

    
489
        r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
490
        if (r < 0){
491
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
492
                                map->volume);
493
                goto out_put;
494
        }
495

    
496
        char *reqtarget = xseg_get_target(peer->xseg, req);
497
        if (!reqtarget)
498
                goto out_put;
499
        strncpy(reqtarget, map->volume, req->targetlen);
500
        req->op = X_RELEASE;
501
        req->size = 0;
502
        req->offset = 0;
503
        r = xseg_set_req_data(peer->xseg, req, pr);
504
        if (r < 0){
505
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
506
                                map->volume);
507
                goto out_put;
508
        }
509
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
510
        if (p == NoPort){
511
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
512
                                map->volume);
513
                goto out_unset;
514
        }
515
        r = xseg_signal(peer->xseg, p);
516
        map->flags |= MF_MAP_CLOSING;
517

    
518
        XSEGLOG2(&lc, I, "Map %s closing", map->volume);
519
        return req;
520

    
521
out_unset:
522
        xseg_get_req_data(peer->xseg, req, &dummy);
523
out_put:
524
        xseg_put_request(peer->xseg, req, pr->portno);
525
out_err:
526
        return NULL;
527
}
528

    
529
static int close_map(struct peer_req *pr, struct map *map)
530
{
531
        int err;
532
        struct xseg_request *req;
533
        struct peerd *peer = pr->peer;
534

    
535
        req = __close_map(pr, map);
536
        if (!req)
537
                return -1;
538
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
539
        map->flags &= ~MF_MAP_CLOSING;
540
        err = req->state & XS_FAILED;
541
        xseg_put_request(peer->xseg, req, pr->portno);
542
        if (err)
543
                return -1;
544
        return 0;
545
}
546

    
547
/*
548
static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
549
                                char *target, uint32_t targetlen, struct map **m)
550
{
551
        struct mapperd *mapper = __get_mapperd(peer);
552
        int r;
553
        *m = find_map(mapper, target, targetlen);
554
        if (*m) {
555
                XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
556
                if ((*m)->flags & MF_MAP_NOT_READY) {
557
                        __xq_append_tail(&(*m)->pending, (xqindex) pr);
558
                        XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
559
                        return MF_PENDING;
560
                //} else if ((*m)->flags & MF_MAP_DESTROYED){
561
                //        return -1;
562
                // 
563
                }else {
564
                        XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
565
                        return 0;
566
                }
567
        }
568
        r = open_map(peer, pr, target, targetlen, 0);
569
        if (r < 0)
570
                return -1; //error
571
        return MF_PENDING;        
572
}
573
*/
574
/*
575
 * Object handling functions
576
 */
577

    
578
struct map_node *find_object(struct map *map, uint64_t obj_index)
579
{
580
        struct map_node *mn;
581
        int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
582
        if (r < 0)
583
                return NULL;
584
        return mn;
585
}
586

    
587
static int insert_object(struct map *map, struct map_node *mn)
588
{
589
        //FIXME no find object first
590
        int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
591
        if (r == -XHASH_ERESIZE) {
592
                unsigned long shift = xhash_grow_size_shift(map->objects);
593
                map->objects = xhash_resize(map->objects, shift, 0, NULL);
594
                if (!map->objects)
595
                        return -1;
596
                r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
597
        }
598
        return r;
599
}
600

    
601

    
602
/*
603
 * map read/write functions
604
 *
605
 * version 0 -> pithos map
606
 * version 1 -> archipelago version 1
607
 *
608
 *
609
 * functions
610
 *         int read_object(struct map_node *mn, unsigned char *buf)
611
 *         int prepare_write_object(struct peer_req *pr, struct map *map,
612
 *                                 struct map_node *mn, struct xseg_request *req)
613
 *         int read_map(struct map *m, unsigned char * data)
614
 *         int prepare_write_map(struct peer_req *pr, struct map *map,
615
 *                                          struct xseg_request *req)
616
 */
617

    
618
struct map_functions {
619
        int (*read_object)(struct map_node *mn, unsigned char *buf);
620
          int (*prepare_write_object)(struct peer_req *pr, struct map *map,
621
                                  struct map_node *mn, struct xseg_request *req);
622
          int (*read_map)(struct map *m, unsigned char * data);
623
         int (*prepare_write_map)(struct peer_req *pr, struct map *map,
624
                                           struct xseg_request *req);
625
};
626

    
627
/* version 0 functions */
628

    
629
/* no header */
630
#define v0_mapheader_size 0
631
/* just the unhexlified name */
632
#define v0_objectsize_in_map SHA256_DIGEST_SIZE
633

    
634
static inline int read_object_v0(struct map_node *mn, unsigned char *buf)
635
{
636
        hexlify(buf, mn->object);
637
        mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
638
        mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
639
        mn->flags = MF_OBJECT_EXIST;
640

    
641
        return 0;
642
}
643

    
644
static void v0_object_to_map(struct map_node *mn, unsigned char *data)
645
{
646
        unhexlify(mn->object, data);
647
}
648

    
649
static int prepare_write_object_v0(struct peer_req *pr, struct map *map,
650
                        struct map_node *mn, struct xseg_request *req)
651
{
652
        struct peerd *peer = pr->peer;
653
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, v0_objectsize_in_map);
654
        if (r < 0){
655
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
656
                                "(Map: %s [%llu]",
657
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
658
                return -1;
659
        }
660
        char *target = xseg_get_target(peer->xseg, req);
661
        strncpy(target, map->volume, req->targetlen);
662
        req->size = req->datalen;
663
        req->offset = v0_mapheader_size + mn->objectidx * v0_objectsize_in_map;
664

    
665
        unsigned char *data = xseg_get_data(pr->peer->xseg, req);
666
        v0_object_to_map(mn, data);
667
        return -1;
668
}
669

    
670
static int read_map_v0(struct map *m, unsigned char * data)
671
{
672
        int r;
673
        struct map_node *map_node;
674
        uint64_t i;
675
        uint64_t pos = 0;
676
        uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
677
        XSEGLOG2(&lc, D, "Max nr_objs %llu", max_nr_objs);
678
        char nulls[SHA256_DIGEST_SIZE];
679
        memset(nulls, 0, SHA256_DIGEST_SIZE);
680
        map_node = calloc(max_nr_objs, sizeof(struct map_node));
681
        if (!map_node)
682
                return -1;
683
        for (i = 0; i < max_nr_objs; i++) {
684
                if (!memcmp(data+pos, nulls, v0_objectsize_in_map))
685
                        break;
686
                map_node[i].objectidx = i;
687
                map_node[i].map = m;
688
                map_node[i].waiters = 0;
689
                map_node[i].ref = 1;
690
                map_node[i].cond = st_cond_new(); //FIXME err check;
691
                read_object_v0(&map_node[i], data+pos);
692
                pos += v0_objectsize_in_map;
693
                r = insert_object(m, &map_node[i]); //FIXME error check
694
        }
695
        XSEGLOG2(&lc, D, "Found %llu objects", i);
696
        m->size = i * block_size;
697
        return 0;
698
}
699

    
700
static int prepare_write_map_v0(struct peer_req *pr, struct map *map,
701
                                struct xseg_request *req)
702
{
703
        struct peerd *peer = pr->peer;
704
        uint64_t i, pos = 0, max_objidx = calc_map_obj(map);
705
        struct map_node *mn;
706
        int r = xseg_prep_request(peer->xseg, req, map->volumelen,
707
                        v0_mapheader_size + max_objidx * v0_objectsize_in_map);
708
        if (r < 0){
709
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
710
                return -1;
711
        }
712
        char *target = xseg_get_target(peer->xseg, req);
713
        strncpy(target, map->volume, req->targetlen);
714
        char *data = xseg_get_data(peer->xseg, req);
715

    
716
        req->op = X_WRITE;
717
        req->size = req->datalen;
718
        req->offset = 0;
719

    
720
        for (i = 0; i < max_objidx; i++) {
721
                mn = find_object(map, i);
722
                if (!mn){
723
                        XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
724
                                        (unsigned long long) i, map->volume);
725
                        return -1;
726
                }
727
                v0_object_to_map(mn, (unsigned char *)(data+pos));
728
                pos += v0_objectsize_in_map;
729
        }
730
        XSEGLOG2(&lc, D, "Prepared %llu objects", i);
731
        return 0;
732
}
733

    
734
/* static struct map_functions map_functions_v0 =        { */
735
/*                         .read_object = read_object_v0, */
736
/*                         .read_map = read_map_v0, */
737
/*                         .prepare_write_object = prepare_write_object_v0, */
738
/*                         .prepare_write_map = prepare_write_map_v0 */
739
/* }; */
740
#define map_functions_v0 {                                \
741
                        .read_object = read_object_v0,        \
742
                        .read_map = read_map_v0,        \
743
                        .prepare_write_object = prepare_write_object_v0,\
744
                        .prepare_write_map = prepare_write_map_v0        \
745
                        }
746
/* v1 functions */
747

    
748
/* transparency byte + max object len in disk */
749
#define v1_objectsize_in_map (1 + SHA256_DIGEST_SIZE)
750

    
751
/* Map header contains:
752
 *         map version
753
 *         volume size
754
 */
755
#define v1_mapheader_size (sizeof (uint32_t) + sizeof(uint64_t))
756

    
757
static inline int read_object_v1(struct map_node *mn, unsigned char *buf)
758
{
759
        char c = buf[0];
760
        mn->flags = 0;
761
        if (c){
762
                mn->flags |= MF_OBJECT_EXIST;
763
                strcpy(mn->object, MAPPER_PREFIX);
764
                hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
765
                mn->object[MAX_OBJECT_LEN] = 0;
766
                mn->objectlen = strlen(mn->object);
767
        }
768
        else {
769
                mn->flags &= ~MF_OBJECT_EXIST;
770
                hexlify(buf+1, mn->object);
771
                mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
772
                mn->objectlen = strlen(mn->object);
773
        }
774
        return 0;
775
}
776

    
777
static inline void v1_object_to_map(char* buf, struct map_node *mn)
778
{
779
        buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
780
        if (buf[0]){
781
                /* strip common prefix */
782
                unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
783
        }
784
        else {
785
                unhexlify(mn->object, (unsigned char *)(buf+1));
786
        }
787
}
788

    
789
static int prepare_write_object_v1(struct peer_req *pr, struct map *map,
790
                                struct map_node *mn, struct xseg_request *req)
791
{
792
        struct peerd *peer = pr->peer;
793
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, v1_objectsize_in_map);
794
        if (r < 0){
795
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
796
                                "(Map: %s [%llu]",
797
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
798
                return -1;
799
        }
800
        char *target = xseg_get_target(peer->xseg, req);
801
        strncpy(target, map->volume, req->targetlen);
802
        req->size = req->datalen;
803
        req->offset = v1_mapheader_size + mn->objectidx * v1_objectsize_in_map;
804

    
805
        char *data = xseg_get_data(pr->peer->xseg, req);
806
        v1_object_to_map(data, mn);
807
        return 0;
808
}
809

    
810
static int read_map_v1(struct map *m, unsigned char * data)
811
{
812
        int r;
813
        struct map_node *map_node;
814
        uint64_t i;
815
        uint64_t pos = 0;
816
        uint64_t nr_objs;
817

    
818
        /* read header */
819
        m->version = *(uint32_t *) (data + pos);
820
        pos += sizeof(uint32_t);
821
        m->size = *(uint64_t *) (data + pos);
822
        pos += sizeof(uint64_t);
823

    
824
        /* read objects */
825
        nr_objs = m->size / block_size;
826
        if (m->size % block_size)
827
                nr_objs++;
828
        map_node = calloc(nr_objs, sizeof(struct map_node));
829
        if (!map_node)
830
                return -1;
831

    
832
        for (i = 0; i < nr_objs; i++) {
833
                map_node[i].map = m;
834
                map_node[i].objectidx = i;
835
                map_node[i].waiters = 0;
836
                map_node[i].ref = 1;
837
                map_node[i].cond = st_cond_new(); //FIXME err check;
838
                read_object_v1(&map_node[i], data+pos);
839
                pos += objectsize_in_map;
840
                r = insert_object(m, &map_node[i]); //FIXME error check
841
        }
842
        return 0;
843
}
844

    
845
static int prepare_write_map_v1(struct peer_req *pr, struct map *m,
846
                                struct xseg_request *req)
847
{
848
        struct peerd *peer = pr->peer;
849
        uint64_t i, pos = 0, max_objidx = calc_map_obj(m);
850
        struct map_node *mn;
851

    
852
        int r = xseg_prep_request(peer->xseg, req, m->volumelen,
853
                        v1_mapheader_size + max_objidx * v1_objectsize_in_map);
854
        if (r < 0){
855
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s", m->volume);
856
                return -1;
857
        }
858
        char *target = xseg_get_target(peer->xseg, req);
859
        strncpy(target, m->volume, req->targetlen);
860
        char *data = xseg_get_data(peer->xseg, req);
861

    
862
        memcpy(data + pos, &m->version, sizeof(m->version));
863
        pos += sizeof(m->version);
864
        memcpy(data + pos, &m->size, sizeof(m->size));
865
        pos += sizeof(m->size);
866

    
867
        req->op = X_WRITE;
868
        req->size = req->datalen;
869
        req->offset = 0;
870

    
871
        for (i = 0; i < max_objidx; i++) {
872
                mn = find_object(m, i);
873
                if (!mn){
874
                        XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
875
                                        (unsigned long long) i, m->volume);
876
                        return -1;
877
                }
878
                v1_object_to_map(data+pos, mn);
879
                pos += v1_objectsize_in_map;
880
        }
881
        return 0;
882
}
883

    
884
/* static struct map_functions map_functions_v1 =        { */
885
/*                         .read_object = read_object_v1, */
886
/*                         .read_map = read_map_v1, */
887
/*                         .prepare_write_object = prepare_write_object_v1, */
888
/*                         .prepare_write_map = prepare_write_map_v1 */
889
/* }; */
890
#define map_functions_v1 {                                \
891
                        .read_object = read_object_v1,        \
892
                        .read_map = read_map_v1,        \
893
                        .prepare_write_object = prepare_write_object_v1,\
894
                        .prepare_write_map = prepare_write_map_v1        \
895
                        }
896

    
897
static struct map_functions map_functions[] = { map_functions_v0,
898
                                                map_functions_v1 };
899
#define MAP_LATEST_VERSION 1
900
/* end of functions */
901

    
902

    
903

    
904

    
905

    
906
static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
907
{
908
        hexlify(buf, mn->object);
909
        mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
910
        mn->objectlen = HEXLIFIED_SHA256_DIGEST_SIZE;
911
        mn->flags = MF_OBJECT_EXIST;
912
}
913

    
914
static inline void map_to_object(struct map_node *mn, unsigned char *buf)
915
{
916
        char c = buf[0];
917
        mn->flags = 0;
918
        if (c){
919
                mn->flags |= MF_OBJECT_EXIST;
920
                strcpy(mn->object, MAPPER_PREFIX);
921
                hexlify(buf+1, mn->object + MAPPER_PREFIX_LEN);
922
                mn->object[MAX_OBJECT_LEN] = 0;
923
                mn->objectlen = strlen(mn->object);
924
        }
925
        else {
926
                hexlify(buf+1, mn->object);
927
                mn->object[HEXLIFIED_SHA256_DIGEST_SIZE] = 0;
928
                mn->objectlen = strlen(mn->object);
929
        }
930

    
931
}
932

    
933
static inline void object_to_map(char* buf, struct map_node *mn)
934
{
935
        buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
936
        if (buf[0]){
937
                /* strip common prefix */
938
                unhexlify(mn->object+MAPPER_PREFIX_LEN, (unsigned char *)(buf+1));
939
        }
940
        else {
941
                unhexlify(mn->object, (unsigned char *)(buf+1));
942
        }
943
}
944

    
945
static inline void mapheader_to_map(struct map *m, char *buf)
946
{
947
        uint64_t pos = 0;
948
        memcpy(buf + pos, &m->version, sizeof(m->version));
949
        pos += sizeof(m->version);
950
        memcpy(buf + pos, &m->size, sizeof(m->size));
951
        pos += sizeof(m->size);
952
}
953

    
954

    
955
static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr,
956
                                struct map *map, struct map_node *mn)
957
{
958
        int r;
959
        void *dummy;
960
        struct mapperd *mapper = __get_mapperd(peer);
961
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
962
                                                        mapper->mbportno, X_ALLOC);
963
        if (!req){
964
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
965
                                "(Map: %s [%llu]",
966
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
967
                goto out_err;
968
        }
969

    
970
        r = map_functions[map->version].prepare_write_object(pr, map, mn, req);
971
        if (r < 0){
972
                XSEGLOG2(&lc, E, "Cannot prepare write object");
973
                goto out_put;
974
        }
975
        req->op = X_WRITE;
976

    
977
        r = xseg_set_req_data(peer->xseg, req, pr);
978
        if (r < 0){
979
                XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
980
                                "(Map: %s [%llu]",
981
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
982
                goto out_put;
983
        }
984
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
985
        if (p == NoPort){
986
                XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
987
                                "(Map: %s [%llu]",
988
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
989
                goto out_unset;
990
        }
991
        r = xseg_signal(peer->xseg, p);
992
        if (r < 0)
993
                XSEGLOG2(&lc, W, "Cannot signal port %u", p);
994

    
995
        XSEGLOG2(&lc, I, "Writing object %s \n\t"
996
                        "Map: %s [%llu]",
997
                        mn->object, map->volume, (unsigned long long) mn->objectidx);
998

    
999
        return req;
1000

    
1001
out_unset:
1002
        xseg_get_req_data(peer->xseg, req, &dummy);
1003
out_put:
1004
        xseg_put_request(peer->xseg, req, pr->portno);
1005
out_err:
1006
        XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
1007
                        "(Map: %s [%llu]",
1008
                        mn->object, map->volume, (unsigned long long) mn->objectidx);
1009
        return NULL;
1010
}
1011

    
1012
static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
1013
{
1014
        int r;
1015
        void *dummy;
1016
        struct peerd *peer = pr->peer;
1017
        struct mapperd *mapper = __get_mapperd(peer);
1018
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1019
                                                        mapper->mbportno, X_ALLOC);
1020
        if (!req){
1021
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
1022
                goto out_err;
1023
        }
1024

    
1025
        r = map_functions[map->version].prepare_write_map(pr, map, req);
1026
        if (r < 0){
1027
                XSEGLOG2(&lc, E, "Cannot prepare write map");
1028
                goto out_put;
1029
        }
1030

    
1031
        req->op = X_WRITE;
1032

    
1033
        r = xseg_set_req_data(peer->xseg, req, pr);
1034
        if (r < 0){
1035
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1036
                                map->volume);
1037
                goto out_put;
1038
        }
1039
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1040
        if (p == NoPort){
1041
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1042
                                map->volume);
1043
                goto out_unset;
1044
        }
1045
        r = xseg_signal(peer->xseg, p);
1046
        if (r < 0)
1047
                XSEGLOG2(&lc, W, "Cannot signal port %u", p);
1048

    
1049
        map->flags |= MF_MAP_WRITING;
1050
        XSEGLOG2(&lc, I, "Writing map %s", map->volume);
1051
        return req;
1052

    
1053
out_unset:
1054
        xseg_get_req_data(peer->xseg, req, &dummy);
1055
out_put:
1056
        xseg_put_request(peer->xseg, req, pr->portno);
1057
out_err:
1058
        XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
1059
        return NULL;
1060
}
1061

    
1062
static int write_map(struct peer_req* pr, struct map *map)
1063
{
1064
        int r = 0;
1065
        struct peerd *peer = pr->peer;
1066
        struct xseg_request *req = __write_map(pr, map);
1067
        if (!req)
1068
                return -1;
1069
        wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1070
        if (req->state & XS_FAILED)
1071
                r = -1;
1072
        xseg_put_request(peer->xseg, req, pr->portno);
1073
        map->flags &= ~MF_MAP_WRITING;
1074
        return r;
1075
}
1076

    
1077
static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
1078
{
1079
        int r;
1080
        xport p;
1081
        struct xseg_request *req;
1082
        struct peerd *peer = pr->peer;
1083
        struct mapperd *mapper = __get_mapperd(peer);
1084
        void *dummy;
1085

    
1086
        XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
1087

    
1088
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1089
        if (!req){
1090
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1091
                                m->volume);
1092
                goto out_fail;
1093
        }
1094

    
1095
        r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1096
        if (r < 0){
1097
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1098
                                m->volume);
1099
                goto out_put;
1100
        }
1101

    
1102
        char *reqtarget = xseg_get_target(peer->xseg, req);
1103
        if (!reqtarget)
1104
                goto out_put;
1105
        strncpy(reqtarget, m->volume, req->targetlen);
1106
        req->op = X_READ;
1107
        req->size = block_size;
1108
        req->offset = 0;
1109
        r = xseg_set_req_data(peer->xseg, req, pr);
1110
        if (r < 0){
1111
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1112
                                m->volume);
1113
                goto out_put;
1114
        }
1115
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1116
        if (p == NoPort){
1117
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1118
                                m->volume);
1119
                goto out_unset;
1120
        }
1121
        r = xseg_signal(peer->xseg, p);
1122

    
1123
        m->flags |= MF_MAP_LOADING;
1124
        XSEGLOG2(&lc, I, "Map %s loading", m->volume);
1125
        return req;
1126

    
1127
out_unset:
1128
        xseg_get_req_data(peer->xseg, req, &dummy);
1129
out_put:
1130
        xseg_put_request(peer->xseg, req, pr->portno);
1131
out_fail:
1132
        return NULL;
1133
}
1134

    
1135
static int read_map (struct map *map, unsigned char *buf)
1136
{
1137
        char nulls[SHA256_DIGEST_SIZE];
1138
        memset(nulls, 0, SHA256_DIGEST_SIZE);
1139

    
1140
        int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
1141
        if (r) {
1142
                XSEGLOG2(&lc, E, "Read zeros");
1143
                return -1;
1144
        }
1145
        //type 1, archip type, type 0 pithos map
1146
        int type = !memcmp(map->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1147
        XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
1148
        uint32_t version;
1149
        if (type)
1150
                version = *(uint32_t *) (buf); //version should always be the first uint32_t
1151
        else
1152
                version = 0;
1153
        if (version > MAP_LATEST_VERSION){
1154
                XSEGLOG2(&lc, E, "Map read for map %s failed. Invalid version %u",
1155
                                map->volume, version);
1156
                return -1;
1157
        }
1158

    
1159
        r = map_functions[version].read_map(map, buf);
1160
        if (r < 0){
1161
                XSEGLOG2(&lc, E, "Map read for map %s failed", map->volume);
1162
                return -1;
1163
        }
1164

    
1165
        print_map(map);
1166
        XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
1167
        return 0;
1168

    
1169
}
1170

    
1171
static int load_map(struct peer_req *pr, struct map *map)
1172
{
1173
        int r = 0;
1174
        struct xseg_request *req;
1175
        struct peerd *peer = pr->peer;
1176
        req = __load_map(pr, map);
1177
        if (!req)
1178
                return -1;
1179
        wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
1180
        map->flags &= ~MF_MAP_LOADING;
1181
        if (req->state & XS_FAILED){
1182
                XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
1183
                xseg_put_request(peer->xseg, req, pr->portno);
1184
                return -1;
1185
        }
1186
        r = read_map(map, (unsigned char *) xseg_get_data(peer->xseg, req));
1187
        xseg_put_request(peer->xseg, req, pr->portno);
1188
        return r;
1189
}
1190

    
1191
static struct xseg_request * __open_map(struct peer_req *pr, struct map *m,
1192
                                                uint32_t flags)
1193
{
1194
        int r;
1195
        xport p;
1196
        struct xseg_request *req;
1197
        struct peerd *peer = pr->peer;
1198
        struct mapperd *mapper = __get_mapperd(peer);
1199
        void *dummy;
1200

    
1201
        XSEGLOG2(&lc, I, "Opening map %s", m->volume);
1202

    
1203
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
1204
        if (!req){
1205
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
1206
                                m->volume);
1207
                goto out_fail;
1208
        }
1209

    
1210
        r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
1211
        if (r < 0){
1212
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
1213
                                m->volume);
1214
                goto out_put;
1215
        }
1216

    
1217
        char *reqtarget = xseg_get_target(peer->xseg, req);
1218
        if (!reqtarget)
1219
                goto out_put;
1220
        strncpy(reqtarget, m->volume, req->targetlen);
1221
        req->op = X_ACQUIRE;
1222
        req->size = block_size;
1223
        req->offset = 0;
1224
        if (!(flags & MF_FORCE))
1225
                req->flags = XF_NOSYNC;
1226
        r = xseg_set_req_data(peer->xseg, req, pr);
1227
        if (r < 0){
1228
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
1229
                                m->volume);
1230
                goto out_put;
1231
        }
1232
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1233
        if (p == NoPort){ 
1234
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
1235
                                m->volume);
1236
                goto out_unset;
1237
        }
1238
        r = xseg_signal(peer->xseg, p);
1239

    
1240
        m->flags |= MF_MAP_OPENING;
1241
        XSEGLOG2(&lc, I, "Map %s opening", m->volume);
1242
        return req;
1243

    
1244
out_unset:
1245
        xseg_get_req_data(peer->xseg, req, &dummy);
1246
out_put:
1247
        xseg_put_request(peer->xseg, req, pr->portno);
1248
out_fail:
1249
        return NULL;
1250
}
1251

    
1252
static int open_map(struct peer_req *pr, struct map *map, uint32_t flags)
1253
{
1254
        int err;
1255
        struct xseg_request *req;
1256
        struct peerd *peer = pr->peer;
1257

    
1258
        req = __open_map(pr, map, flags);
1259
        if (!req){
1260
                return -1;
1261
        }
1262
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1263
        map->flags &= ~MF_MAP_OPENING;
1264
        err = req->state & XS_FAILED;
1265
        xseg_put_request(peer->xseg, req, pr->portno);
1266
        if (err)
1267
                return -1;
1268
        else
1269
                map->flags |= MF_MAP_EXCLUSIVE;
1270
        return 0;
1271
}
1272

    
1273
/*
1274
 * copy up functions
1275
 */
1276

    
1277
static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
1278
{
1279
        int r = 0;
1280
        if (mn){
1281
                XSEGLOG2(&lc, D, "Inserting (req: %lx, mapnode: %lx) on mio %lx",
1282
                                req, mn, mio);
1283
                r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1284
                if (r == -XHASH_ERESIZE) {
1285
                        xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
1286
                        xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, 0, NULL);
1287
                        if (!new_hashmap)
1288
                                goto out;
1289
                        mio->copyups_nodes = new_hashmap;
1290
                        r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
1291
                }
1292
                if (r < 0)
1293
                        XSEGLOG2(&lc, E, "Insertion of (%lx, %lx) on mio %lx failed",
1294
                                        req, mn, mio);
1295
        }
1296
        else {
1297
                XSEGLOG2(&lc, D, "Deleting req: %lx from mio %lx",
1298
                                req, mio);
1299
                r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1300
                if (r == -XHASH_ERESIZE) {
1301
                        xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
1302
                        xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, 0, NULL);
1303
                        if (!new_hashmap)
1304
                                goto out;
1305
                        mio->copyups_nodes = new_hashmap;
1306
                        r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
1307
                }
1308
                if (r < 0)
1309
                        XSEGLOG2(&lc, E, "Deletion of %lx on mio %lx failed",
1310
                                        req, mio);
1311
        }
1312
out:
1313
        return r;
1314
}
1315

    
1316
static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
1317
{
1318
        struct map_node *mn;
1319
        int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
1320
        if (r < 0){
1321
                XSEGLOG2(&lc, W, "Cannot find req %lx on mio %lx", req, mio);
1322
                return NULL;
1323
        }
1324
        XSEGLOG2(&lc, D, "Found mapnode %lx req %lx on mio %lx", mn, req, mio);
1325
        return mn;
1326
}
1327

    
1328
static struct xseg_request * __snapshot_object(struct peer_req *pr,
1329
                                                struct map_node *mn)
1330
{
1331
        struct peerd *peer = pr->peer;
1332
        struct mapperd *mapper = __get_mapperd(peer);
1333
        struct mapper_io *mio = __get_mapper_io(pr);
1334
        //struct map *map = mn->map;
1335
        void *dummy;
1336
        int r = -1;
1337
        xport p;
1338

    
1339
        //assert mn->volume != zero_block
1340
        //assert mn->flags & MF_OBJECT_EXIST
1341
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1342
                                                mapper->bportno, X_ALLOC);
1343
        if (!req){
1344
                XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1345
                goto out_err;
1346
        }
1347
        r = xseg_prep_request(peer->xseg, req, mn->objectlen,
1348
                                sizeof(struct xseg_request_snapshot));
1349
        if (r < 0){
1350
                XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1351
                goto out_put;
1352
        }
1353

    
1354
        char *target = xseg_get_target(peer->xseg, req);
1355
        strncpy(target, mn->object, req->targetlen);
1356

    
1357
        struct xseg_request_snapshot *xsnapshot = (struct xseg_request_snapshot *) xseg_get_data(peer->xseg, req);
1358
        xsnapshot->target[0] = 0;
1359
        xsnapshot->targetlen = 0;
1360

    
1361
        req->offset = 0;
1362
        req->size = block_size;
1363
        req->op = X_SNAPSHOT;
1364
        r = xseg_set_req_data(peer->xseg, req, pr);
1365
        if (r<0){
1366
                XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1367
                goto out_put;
1368
        }
1369
        r = __set_copyup_node(mio, req, mn);
1370
        if (r < 0)
1371
                goto out_unset;
1372
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1373
        if (p == NoPort) {
1374
                XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1375
                goto out_mapper_unset;
1376
        }
1377
        xseg_signal(peer->xseg, p);
1378

    
1379
        mn->flags |= MF_OBJECT_SNAPSHOTTING;
1380
        XSEGLOG2(&lc, I, "Snapshotting up object %s", mn->object);
1381
        return req;
1382

    
1383
out_mapper_unset:
1384
        __set_copyup_node(mio, req, NULL);
1385
out_unset:
1386
        xseg_get_req_data(peer->xseg, req, &dummy);
1387
out_put:
1388
        xseg_put_request(peer->xseg, req, pr->portno);
1389
out_err:
1390
        XSEGLOG2(&lc, E, "Snapshotting object %s failed", mn->object);
1391
        return NULL;
1392
}
1393

    
1394
static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
1395
{
1396
        struct mapperd *mapper = __get_mapperd(peer);
1397
        struct mapper_io *mio = __get_mapper_io(pr);
1398
        struct map *map = mn->map;
1399
        void *dummy;
1400
        int r = -1;
1401
        xport p;
1402

    
1403
        uint32_t newtargetlen;
1404
        char new_target[MAX_OBJECT_LEN + 1];
1405
        unsigned char sha[SHA256_DIGEST_SIZE];
1406

    
1407
        strncpy(new_target, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1408

    
1409
        char tmp[XSEG_MAX_TARGETLEN + 1];
1410
        uint32_t tmplen;
1411
        strncpy(tmp, map->volume, map->volumelen);
1412
        sprintf(tmp + map->volumelen, "_%u", mn->objectidx);
1413
        tmp[XSEG_MAX_TARGETLEN] = 0;
1414
        tmplen = strlen(tmp);
1415
        SHA256((unsigned char *)tmp, tmplen, sha);
1416
        hexlify(sha, new_target+MAPPER_PREFIX_LEN);
1417
        newtargetlen = MAPPER_PREFIX_LEN + HEXLIFIED_SHA256_DIGEST_SIZE;
1418

    
1419

    
1420
        if (!strncmp(mn->object, zero_block, ZERO_BLOCK_LEN))
1421
                goto copyup_zeroblock;
1422

    
1423
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
1424
                                                mapper->bportno, X_ALLOC);
1425
        if (!req){
1426
                XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1427
                goto out_err;
1428
        }
1429
        r = xseg_prep_request(peer->xseg, req, newtargetlen, 
1430
                                sizeof(struct xseg_request_copy));
1431
        if (r < 0){
1432
                XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
1433
                goto out_put;
1434
        }
1435

    
1436
        char *target = xseg_get_target(peer->xseg, req);
1437
        strncpy(target, new_target, req->targetlen);
1438

    
1439
        struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
1440
        strncpy(xcopy->target, mn->object, mn->objectlen);
1441
        xcopy->targetlen = mn->objectlen;
1442

    
1443
        req->offset = 0;
1444
        req->size = block_size;
1445
        req->op = X_COPY;
1446
        r = xseg_set_req_data(peer->xseg, req, pr);
1447
        if (r<0){
1448
                XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
1449
                goto out_put;
1450
        }
1451
        r = __set_copyup_node(mio, req, mn);
1452
        if (r < 0)
1453
                goto out_unset;
1454
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1455
        if (p == NoPort) {
1456
                XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
1457
                goto out_mapper_unset;
1458
        }
1459
        xseg_signal(peer->xseg, p);
1460
//        mio->copyups++;
1461

    
1462
        mn->flags |= MF_OBJECT_COPYING;
1463
        XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
1464
        return req;
1465

    
1466
out_mapper_unset:
1467
        __set_copyup_node(mio, req, NULL);
1468
out_unset:
1469
        xseg_get_req_data(peer->xseg, req, &dummy);
1470
out_put:
1471
        xseg_put_request(peer->xseg, req, pr->portno);
1472
out_err:
1473
        XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
1474
        return NULL;
1475

    
1476
copyup_zeroblock:
1477
        XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
1478
                        "Proceeding in writing the new object in map");
1479
        /* construct a tmp map_node for writing purposes */
1480
        struct map_node newmn = *mn;
1481
        newmn.flags = MF_OBJECT_EXIST;
1482
        strncpy(newmn.object, new_target, newtargetlen);
1483
        newmn.object[newtargetlen] = 0;
1484
        newmn.objectlen = newtargetlen;
1485
        newmn.objectidx = mn->objectidx; 
1486
        req = object_write(peer, pr, map, &newmn);
1487
        r = __set_copyup_node(mio, req, mn);
1488
        if (r < 0)
1489
                return NULL;
1490
        if (!req){
1491
                XSEGLOG2(&lc, E, "Object write returned error for object %s"
1492
                                "\n\t of map %s [%llu]",
1493
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
1494
                return NULL;
1495
        }
1496
        mn->flags |= MF_OBJECT_WRITING;
1497
        XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1498
        return req;
1499
}
1500

    
1501
static struct xseg_request * __delete_object(struct peer_req *pr, struct map_node *mn)
1502
{
1503
        void *dummy;
1504
        struct peerd *peer = pr->peer;
1505
        struct mapperd *mapper = __get_mapperd(peer);
1506
        struct mapper_io *mio = __get_mapper_io(pr);
1507
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1508
                                                        mapper->bportno, X_ALLOC);
1509
        XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
1510
        if (!req){
1511
                XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
1512
                goto out_err;
1513
        }
1514
        int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
1515
        if (r < 0){
1516
                XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
1517
                goto out_put;
1518
        }
1519
        char *target = xseg_get_target(peer->xseg, req);
1520
        strncpy(target, mn->object, req->targetlen);
1521
        req->op = X_DELETE;
1522
        req->size = req->datalen;
1523
        req->offset = 0;
1524
        r = xseg_set_req_data(peer->xseg, req, pr);
1525
        if (r < 0){
1526
                XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
1527
                goto out_put;
1528
        }
1529
        r = __set_copyup_node(mio, req, mn);
1530
        if (r < 0)
1531
                goto out_unset;
1532
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1533
        if (p == NoPort){
1534
                XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
1535
                goto out_mapper_unset;
1536
        }
1537
        r = xseg_signal(peer->xseg, p);
1538
        mn->flags |= MF_OBJECT_DELETING;
1539
        XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1540
        return req;
1541

    
1542
out_mapper_unset:
1543
        __set_copyup_node(mio, req, NULL);
1544
out_unset:
1545
        xseg_get_req_data(peer->xseg, req, &dummy);
1546
out_put:
1547
        xseg_put_request(peer->xseg, req, pr->portno);
1548
out_err:
1549
        XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1550
        return NULL;
1551
}
1552

    
1553
static struct xseg_request * __delete_map(struct peer_req *pr, struct map *map)
1554
{
1555
        void *dummy;
1556
        struct peerd *peer = pr->peer;
1557
        struct mapperd *mapper = __get_mapperd(peer);
1558
        struct mapper_io *mio = __get_mapper_io(pr);
1559
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1560
                                                        mapper->mbportno, X_ALLOC);
1561
        XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1562
        if (!req){
1563
                XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1564
                goto out_err;
1565
        }
1566
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1567
        if (r < 0){
1568
                XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1569
                goto out_put;
1570
        }
1571
        char *target = xseg_get_target(peer->xseg, req);
1572
        strncpy(target, map->volume, req->targetlen);
1573
        req->op = X_DELETE;
1574
        req->size = req->datalen;
1575
        req->offset = 0;
1576
        r = xseg_set_req_data(peer->xseg, req, pr);
1577
        if (r < 0){
1578
                XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1579
                goto out_put;
1580
        }
1581
        /* do not check return value. just make sure there is no node set */
1582
        __set_copyup_node(mio, req, NULL);
1583
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1584
        if (p == NoPort){
1585
                XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1586
                goto out_unset;
1587
        }
1588
        r = xseg_signal(peer->xseg, p);
1589
        map->flags |= MF_MAP_DELETING;
1590
        XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1591
        return req;
1592

    
1593
out_unset:
1594
        xseg_get_req_data(peer->xseg, req, &dummy);
1595
out_put:
1596
        xseg_put_request(peer->xseg, req, pr->portno);
1597
out_err:
1598
        XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1599
        return  NULL;
1600
}
1601

    
1602

    
1603
static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1604
{
1605
        struct map_node *mn = find_object(map, index);
1606
        if (mn)
1607
                mn->ref++;
1608
        return mn;
1609
}
1610

    
1611
static inline void put_mapnode(struct map_node *mn)
1612
{
1613
        mn->ref--;
1614
        if (!mn->ref){
1615
                //clean up mn
1616
                st_cond_destroy(mn->cond);
1617
        }
1618
}
1619

    
1620
static inline void __get_map(struct map *map)
1621
{
1622
        map->ref++;
1623
}
1624

    
1625
static inline void put_map(struct map *map)
1626
{
1627
        struct map_node *mn;
1628
        map->ref--;
1629
        if (!map->ref){
1630
                XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1631
                //clean up map
1632
                uint64_t i;
1633
                for (i = 0; i < calc_map_obj(map); i++) {
1634
                        mn = get_mapnode(map, i);
1635
                        if (mn) {
1636
                                //make sure all pending operations on all objects are completed
1637
                                //this should never happen...
1638
                                wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1639
                                mn->flags |= MF_OBJECT_DESTROYED;
1640
                                put_mapnode(mn); //matchin mn->ref = 1 on mn init
1641
                                put_mapnode(mn); //matcing get_mapnode;
1642
                                //assert mn->ref == 0;
1643
                        }
1644
                }
1645
                mn = find_object(map, 0);
1646
                if (mn)
1647
                        free(mn);
1648
                XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1649
                free(map);
1650
        }
1651
}
1652

    
1653
static struct map * create_map(struct mapperd *mapper, char *name,
1654
                                uint32_t namelen, uint32_t flags)
1655
{
1656
        int r;
1657
        if (namelen + MAPPER_PREFIX_LEN > MAX_VOLUME_LEN){
1658
                XSEGLOG2(&lc, E, "Namelen %u too long. Max: %d",
1659
                                        namelen, MAX_VOLUME_LEN);
1660
                return NULL;
1661
        }
1662
        struct map *m = malloc(sizeof(struct map));
1663
        if (!m){
1664
                XSEGLOG2(&lc, E, "Cannot allocate map ");
1665
                goto out_err;
1666
        }
1667
        m->size = -1;
1668
        if (flags & MF_ARCHIP){
1669
                strncpy(m->volume, MAPPER_PREFIX, MAPPER_PREFIX_LEN);
1670
                strncpy(m->volume + MAPPER_PREFIX_LEN, name, namelen);
1671
                m->volume[MAPPER_PREFIX_LEN + namelen] = 0;
1672
                m->volumelen = MAPPER_PREFIX_LEN + namelen;
1673
                m->version = 1; /* keep this hardcoded for now */
1674
        }
1675
        else {
1676
                strncpy(m->volume, name, namelen);
1677
                m->volume[namelen] = 0;
1678
                m->volumelen = namelen;
1679
                m->version = 0; /* version 0 should be pithos maps */
1680
        }
1681
        m->flags = 0;
1682
        m->objects = xhash_new(3, 0, INTEGER); 
1683
        if (!m->objects){
1684
                XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1685
                                m->volume);
1686
                goto out_map;
1687
        }
1688
        m->ref = 1;
1689
        m->waiters = 0;
1690
        m->cond = st_cond_new(); //FIXME err check;
1691
        r = insert_map(mapper, m);
1692
        if (r < 0){
1693
                XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1694
                goto out_hash;
1695
        }
1696

    
1697
        return m;
1698

    
1699
out_hash:
1700
        xhash_free(m->objects);
1701
out_map:
1702
        XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1703
        free(m);
1704
out_err:
1705
        return NULL;
1706
}
1707

    
1708

    
1709

    
1710
void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1711
{
1712
        struct peerd *peer = pr->peer;
1713
        struct mapperd *mapper = __get_mapperd(peer);
1714
        (void)mapper;
1715
        struct mapper_io *mio = __get_mapper_io(pr);
1716
        struct map_node *mn = __get_copyup_node(mio, req);
1717

    
1718
        __set_copyup_node(mio, req, NULL);
1719

    
1720
        //assert req->op = X_DELETE;
1721
        //assert pr->req->op = X_DELETE only map deletions make delete requests
1722
        //assert mio->del_pending > 0
1723
        XSEGLOG2(&lc, D, "mio: %lx, del_pending: %llu", mio, mio->del_pending);
1724
        mio->del_pending--;
1725

    
1726
        if (req->state & XS_FAILED){
1727
                mio->err = 1;
1728
        }
1729
        if (mn){
1730
                XSEGLOG2(&lc, D, "Found mapnode %lx %s for mio: %lx, req: %lx",
1731
                                mn, mn->object, mio, req);
1732
                // assert mn->flags & MF_OBJECT_DELETING
1733
                mn->flags &= ~MF_OBJECT_DELETING;
1734
                mn->flags |= MF_OBJECT_DESTROYED;
1735
                signal_mapnode(mn);
1736
                /* put mapnode here, matches get_mapnode on do_destroy */
1737
                put_mapnode(mn);
1738
        } else {
1739
                XSEGLOG2(&lc, E, "Cannot get map node for mio: %lx, req: %lx",
1740
                                mio, req);
1741
        }
1742
        xseg_put_request(peer->xseg, req, pr->portno);
1743
        signal_pr(pr);
1744
}
1745

    
1746
void snapshot_cb(struct peer_req *pr, struct xseg_request *req)
1747
{
1748
        struct peerd *peer = pr->peer;
1749
        struct mapperd *mapper = __get_mapperd(peer);
1750
        (void)mapper;
1751
        struct mapper_io *mio = __get_mapper_io(pr);
1752
        struct map_node *mn = __get_copyup_node(mio, req);
1753
        if (!mn){
1754
                XSEGLOG2(&lc, E, "Cannot get map node");
1755
                goto out_err;
1756
        }
1757
        __set_copyup_node(mio, req, NULL);
1758

    
1759
        if (req->state & XS_FAILED){
1760
                if (req->op == X_DELETE){
1761
                        XSEGLOG2(&lc, E, "Delete req failed");
1762
                        goto out_ok;
1763
                }
1764
                XSEGLOG2(&lc, E, "Req failed");
1765
                mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1766
                mn->flags &= ~MF_OBJECT_WRITING;
1767
                goto out_err;
1768
        }
1769

    
1770
        if (req->op == X_WRITE) {
1771
                char old_object_name[MAX_OBJECT_LEN + 1];
1772
                uint32_t old_objectlen;
1773

    
1774
                char *target = xseg_get_target(peer->xseg, req);
1775
                (void)target;
1776
                //assert mn->flags & MF_OBJECT_WRITING
1777
                mn->flags &= ~MF_OBJECT_WRITING;
1778
                strncpy(old_object_name, mn->object, mn->objectlen);
1779
                old_objectlen = mn->objectlen;
1780

    
1781
                struct map_node tmp;
1782
                char *data = xseg_get_data(peer->xseg, req);
1783
                map_to_object(&tmp, (unsigned char *) data);
1784
                mn->flags &= ~MF_OBJECT_EXIST;
1785

    
1786
                strncpy(mn->object, tmp.object, tmp.objectlen);
1787
                mn->object[tmp.objectlen] = 0;
1788
                mn->objectlen = tmp.objectlen;
1789
                XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1790
                //signal_mapnode since Snapshot was successfull
1791
                signal_mapnode(mn);
1792

    
1793
                //do delete old object
1794
                strncpy(tmp.object, old_object_name, old_objectlen);
1795
                tmp.object[old_objectlen] = 0;
1796
                tmp.objectlen = old_objectlen;
1797
                tmp.flags = MF_OBJECT_EXIST;
1798
                struct xseg_request *xreq = __delete_object(pr, &tmp);
1799
                if (!xreq){
1800
                        //just a warning. Snapshot was successfull
1801
                        XSEGLOG2(&lc, W, "Cannot delete old object %s", tmp.object);
1802
                        goto out_ok;
1803
                }
1804
                //overwrite copyup node, since tmp is a stack dummy variable
1805
                __set_copyup_node (mio, xreq, mn);
1806
                XSEGLOG2(&lc, I, "Deletion of %s pending", tmp.object);
1807
        } else if (req->op == X_SNAPSHOT) {
1808
                //issue write_object;
1809
                mn->flags &= ~MF_OBJECT_SNAPSHOTTING;
1810
                struct map *map = mn->map;
1811
                if (!map){
1812
                        XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1813
                        goto out_err;
1814
                }
1815

    
1816
                /* construct a tmp map_node for writing purposes */
1817
                //char *target = xseg_get_target(peer->xseg, req);
1818
                struct map_node newmn = *mn;
1819
                newmn.flags = 0;
1820
                struct xseg_reply_snapshot *xreply;
1821
                xreply = (struct xseg_reply_snapshot *) xseg_get_data(peer->xseg, req);
1822
                //assert xreply->targetlen !=0
1823
                //assert xreply->targetlen < XSEG_MAX_TARGETLEN
1824
                //xreply->target[xreply->targetlen] = 0;
1825
                //assert xreply->target valid
1826
                strncpy(newmn.object, xreply->target, xreply->targetlen);
1827
                newmn.object[req->targetlen] = 0;
1828
                newmn.objectlen = req->targetlen;
1829
                newmn.objectidx = mn->objectidx;
1830
                struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1831
                if (!xreq){
1832
                        XSEGLOG2(&lc, E, "Object write returned error for object %s"
1833
                                        "\n\t of map %s [%llu]",
1834
                                        mn->object, map->volume, (unsigned long long) mn->objectidx);
1835
                        goto out_err;
1836
                }
1837
                mn->flags |= MF_OBJECT_WRITING;
1838
                __set_copyup_node (mio, xreq, mn);
1839

    
1840
                XSEGLOG2(&lc, I, "Object %s snapshot completed. Pending writing.", mn->object);
1841
        } else if (req->op == X_DELETE){
1842
                //deletion of the old block completed
1843
                XSEGLOG2(&lc, I, "Deletion of completed");
1844
                goto out_ok;
1845
                ;
1846
        } else {
1847
                //wtf??
1848
                ;
1849
        }
1850

    
1851
out:
1852
        xseg_put_request(peer->xseg, req, pr->portno);
1853
        return;
1854

    
1855
out_err:
1856
        mio->snap_pending--;
1857
        XSEGLOG2(&lc, D, "Mio->snap_pending: %u", mio->snap_pending);
1858
        mio->err = 1;
1859
        if (mn)
1860
                signal_mapnode(mn);
1861
        signal_pr(pr);
1862
        goto out;
1863

    
1864
out_ok:
1865
        mio->snap_pending--;
1866
        signal_pr(pr);
1867
        goto out;
1868

    
1869

    
1870
}
1871
void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1872
{
1873
        struct peerd *peer = pr->peer;
1874
        struct mapperd *mapper = __get_mapperd(peer);
1875
        (void)mapper;
1876
        struct mapper_io *mio = __get_mapper_io(pr);
1877
        struct map_node *mn = __get_copyup_node(mio, req);
1878
        if (!mn){
1879
                XSEGLOG2(&lc, E, "Cannot get map node");
1880
                goto out_err;
1881
        }
1882
        __set_copyup_node(mio, req, NULL);
1883

    
1884
        if (req->state & XS_FAILED){
1885
                XSEGLOG2(&lc, E, "Req failed");
1886
                mn->flags &= ~MF_OBJECT_COPYING;
1887
                mn->flags &= ~MF_OBJECT_WRITING;
1888
                goto out_err;
1889
        }
1890
        if (req->op == X_WRITE) {
1891
                char *target = xseg_get_target(peer->xseg, req);
1892
                (void)target;
1893
                //printf("handle object write replyi\n");
1894
                __set_copyup_node(mio, req, NULL);
1895
                //assert mn->flags & MF_OBJECT_WRITING
1896
                mn->flags &= ~MF_OBJECT_WRITING;
1897

    
1898
                struct map_node tmp;
1899
                char *data = xseg_get_data(peer->xseg, req);
1900
                map_to_object(&tmp, (unsigned char *) data);
1901
                mn->flags |= MF_OBJECT_EXIST;
1902
                if (mn->flags != MF_OBJECT_EXIST){
1903
                        XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1904
                        goto out_err;
1905
                }
1906
                //assert mn->flags & MF_OBJECT_EXIST
1907
                strncpy(mn->object, tmp.object, tmp.objectlen);
1908
                mn->object[tmp.objectlen] = 0;
1909
                mn->objectlen = tmp.objectlen;
1910
                XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1911
                mio->copyups--;
1912
                signal_mapnode(mn);
1913
                signal_pr(pr);
1914
        } else if (req->op == X_COPY) {
1915
        //        issue write_object;
1916
                mn->flags &= ~MF_OBJECT_COPYING;
1917
                struct map *map = mn->map;
1918
                if (!map){
1919
                        XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1920
                        goto out_err;
1921
                }
1922

    
1923
                /* construct a tmp map_node for writing purposes */
1924
                char *target = xseg_get_target(peer->xseg, req);
1925
                struct map_node newmn = *mn;
1926
                newmn.flags = MF_OBJECT_EXIST;
1927
                strncpy(newmn.object, target, req->targetlen);
1928
                newmn.object[req->targetlen] = 0;
1929
                newmn.objectlen = req->targetlen;
1930
                newmn.objectidx = mn->objectidx; 
1931
                struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1932
                if (!xreq){
1933
                        XSEGLOG2(&lc, E, "Object write returned error for object %s"
1934
                                        "\n\t of map %s [%llu]",
1935
                                        mn->object, map->volume, (unsigned long long) mn->objectidx);
1936
                        goto out_err;
1937
                }
1938
                mn->flags |= MF_OBJECT_WRITING;
1939
                __set_copyup_node (mio, xreq, mn);
1940

    
1941
                XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1942
        } else {
1943
                //wtf??
1944
                ;
1945
        }
1946

    
1947
out:
1948
        xseg_put_request(peer->xseg, req, pr->portno);
1949
        return;
1950

    
1951
out_err:
1952
        mio->copyups--;
1953
        XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1954
        mio->err = 1;
1955
        if (mn)
1956
                signal_mapnode(mn);
1957
        signal_pr(pr);
1958
        goto out;
1959

    
1960
}
1961

    
1962
struct r2o {
1963
        struct map_node *mn;
1964
        uint64_t offset;
1965
        uint64_t size;
1966
};
1967

    
1968
static int req2objs(struct peer_req *pr, struct map *map, int write)
1969
{
1970
        int r = 0;
1971
        struct peerd *peer = pr->peer;
1972
        struct mapper_io *mio = __get_mapper_io(pr);
1973
        char *target = xseg_get_target(peer->xseg, pr->req);
1974
        uint32_t nr_objs = calc_nr_obj(pr->req);
1975
        uint64_t size = sizeof(struct xseg_reply_map) + 
1976
                        nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1977
        uint32_t idx, i;
1978
        uint64_t rem_size, obj_index, obj_offset, obj_size; 
1979
        struct map_node *mn;
1980
        mio->copyups = 0;
1981
        XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1982

    
1983
        /* get map_nodes of request */
1984
        struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1985
        if (!mns){
1986
                XSEGLOG2(&lc, E, "Cannot allocate mns");
1987
                return -1;
1988
        }
1989
        idx = 0;
1990
        rem_size = pr->req->size;
1991
        obj_index = pr->req->offset / block_size;
1992
        obj_offset = pr->req->offset & (block_size -1); //modulo
1993
        obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1994
        mn = get_mapnode(map, obj_index);
1995
        if (!mn) {
1996
                XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1997
                r = -1;
1998
                goto out;
1999
        }
2000
        mns[idx].mn = mn;
2001
        mns[idx].offset = obj_offset;
2002
        mns[idx].size = obj_size;
2003
        rem_size -= obj_size;
2004
        while (rem_size > 0) {
2005
                idx++;
2006
                obj_index++;
2007
                obj_offset = 0;
2008
                obj_size = (rem_size >  block_size) ? block_size : rem_size;
2009
                rem_size -= obj_size;
2010
                mn = get_mapnode(map, obj_index);
2011
                if (!mn) {
2012
                        XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
2013
                        r = -1;
2014
                        goto out;
2015
                }
2016
                mns[idx].mn = mn;
2017
                mns[idx].offset = obj_offset;
2018
                mns[idx].size = obj_size;
2019
        }
2020
        if (write) {
2021
                int can_wait = 0;
2022
                mio->cb=copyup_cb;
2023
                /* do a first scan and issue as many copyups as we can.
2024
                 * then retry and wait when an object is not ready.
2025
                 * this could be done better, since now we wait also on the
2026
                 * pending copyups
2027
                 */
2028
                int j;
2029
                for (j = 0; j < 2 && !mio->err; j++) {
2030
                        for (i = 0; i < (idx+1); i++) {
2031
                                mn = mns[i].mn;
2032
                                //do copyups
2033
                                if (mn->flags & MF_OBJECT_NOT_READY){
2034
                                        if (!can_wait)
2035
                                                continue;
2036
                                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2037
                                        if (mn->flags & MF_OBJECT_DESTROYED){
2038
                                                mio->err = 1;
2039
                                                continue;
2040
                                        }
2041
                                }
2042

    
2043
                                if (!(mn->flags & MF_OBJECT_EXIST)) {
2044
                                        //calc new_target, copy up object
2045
                                        if (copyup_object(peer, mn, pr) == NULL){
2046
                                                XSEGLOG2(&lc, E, "Error in copy up object");
2047
                                                mio->err = 1;
2048
                                        } else {
2049
                                                mio->copyups++;
2050
                                        }
2051
                                }
2052

    
2053
                                if (mio->err){
2054
                                        XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
2055
                                        break;
2056
                                }
2057
                        }
2058
                        can_wait = 1;
2059
                }
2060
                wait_on_pr(pr, mio->copyups > 0);
2061
        }
2062

    
2063
        if (mio->err){
2064
                r = -1;
2065
                XSEGLOG2(&lc, E, "Mio->err");
2066
                goto out;
2067
        }
2068

    
2069
        /* resize request to fit reply */
2070
        char buf[XSEG_MAX_TARGETLEN];
2071
        strncpy(buf, target, pr->req->targetlen);
2072
        r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
2073
        if (r < 0) {
2074
                XSEGLOG2(&lc, E, "Cannot resize request");
2075
                goto out;
2076
        }
2077
        target = xseg_get_target(peer->xseg, pr->req);
2078
        strncpy(target, buf, pr->req->targetlen);
2079

    
2080
        /* structure reply */
2081
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2082
        reply->cnt = nr_objs;
2083
        for (i = 0; i < (idx+1); i++) {
2084
                strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
2085
                reply->segs[i].targetlen = mns[i].mn->objectlen;
2086
                reply->segs[i].offset = mns[i].offset;
2087
                reply->segs[i].size = mns[i].size;
2088
        }
2089
out:
2090
        for (i = 0; i < idx; i++) {
2091
                put_mapnode(mns[i].mn);
2092
        }
2093
        free(mns);
2094
        mio->cb = NULL;
2095
        return r;
2096
}
2097

    
2098
static int do_dropcache(struct peer_req *pr, struct map *map)
2099
{
2100
        struct map_node *mn;
2101
        struct peerd *peer = pr->peer;
2102
        struct mapperd *mapper = __get_mapperd(peer);
2103
        uint64_t i;
2104
        XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
2105
        map->flags |= MF_MAP_DROPPING_CACHE;
2106
        for (i = 0; i < calc_map_obj(map); i++) {
2107
                mn = get_mapnode(map, i);
2108
                if (mn) {
2109
                        if (!(mn->flags & MF_OBJECT_DESTROYED)){
2110
                                //make sure all pending operations on all objects are completed
2111
                                wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2112
                                mn->flags |= MF_OBJECT_DESTROYED;
2113
                        }
2114
                        put_mapnode(mn);
2115
                }
2116
        }
2117
        map->flags &= ~MF_MAP_DROPPING_CACHE;
2118
        map->flags |= MF_MAP_DESTROYED;
2119
        remove_map(mapper, map);
2120
        XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
2121
        put_map(map);        // put map here to destroy it (matches m->ref = 1 on map create)
2122
        return 0;
2123
}
2124

    
2125
static int do_info(struct peer_req *pr, struct map *map)
2126
{
2127
        struct peerd *peer = pr->peer;
2128
        struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
2129
        xinfo->size = map->size;
2130
        return 0;
2131
}
2132

    
2133

    
2134
static int do_open(struct peer_req *pr, struct map *map)
2135
{
2136
        if (map->flags & MF_MAP_EXCLUSIVE){
2137
                return 0;
2138
        }
2139
        else {
2140
                return -1;
2141
        }
2142
}
2143

    
2144
static int do_close(struct peer_req *pr, struct map *map)
2145
{
2146
        if (map->flags & MF_MAP_EXCLUSIVE){
2147
                /* do not drop cache if close failed and map not deleted */
2148
                if (close_map(pr, map) < 0 && !(map->flags & MF_MAP_DELETED))
2149
                        return -1;
2150
        }
2151
        return do_dropcache(pr, map);
2152
}
2153

    
2154
static int do_snapshot(struct peer_req *pr, struct map *map)
2155
{
2156
        uint64_t i;
2157
        struct peerd *peer = pr->peer;
2158
        struct mapper_io *mio = __get_mapper_io(pr);
2159
        struct map_node *mn;
2160
        struct xseg_request *req;
2161

    
2162
        if (!(map->flags & MF_MAP_EXCLUSIVE)){
2163
                XSEGLOG2(&lc, E, "Map was not opened exclusively");
2164
                return -1;
2165
        }
2166
        XSEGLOG2(&lc, I, "Starting snapshot for map %s", map->volume);
2167
        map->flags |= MF_MAP_SNAPSHOTTING;
2168

    
2169
        uint64_t nr_obj = calc_map_obj(map);
2170
        mio->cb = snapshot_cb;
2171
        mio->snap_pending = 0;
2172
        mio->err = 0;
2173
        for (i = 0; i < nr_obj; i++){
2174

    
2175
                /* throttle pending snapshots
2176
                 * this should be nr_ops of the blocker, but since we don't know
2177
                 * that, we assume based on our own nr_ops
2178
                 */
2179
                wait_on_pr(pr, mio->snap_pending >= peer->nr_ops);
2180

    
2181
                mn = get_mapnode(map, i);
2182
                if (!mn)
2183
                        //warning?
2184
                        continue;
2185
                if (!(mn->flags & MF_OBJECT_EXIST)){
2186
                        put_mapnode(mn);
2187
                        continue;
2188
                }
2189
                // make sure all pending operations on all objects are completed
2190
                wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2191

    
2192
                /* TODO will this ever happen?? */
2193
                if (mn->flags & MF_OBJECT_DESTROYED){
2194
                        put_mapnode(mn);
2195
                        continue;
2196
                }
2197

    
2198
                req = __snapshot_object(pr, mn);
2199
                if (!req){
2200
                        mio->err = 1;
2201
                        put_mapnode(mn);
2202
                        break;
2203
                }
2204
                mio->snap_pending++;
2205
                /* do not put_mapnode here. cb does that */
2206
        }
2207

    
2208
        wait_on_pr(pr, mio->snap_pending > 0);
2209
        mio->cb = NULL;
2210

    
2211
        if (mio->err)
2212
                goto out_err;
2213

    
2214
        /* calculate name of snapshot */
2215
        struct map tmp_map = *map;
2216
        unsigned char sha[SHA256_DIGEST_SIZE];
2217
        unsigned char *buf = malloc(block_size);
2218
        char newvolumename[MAX_VOLUME_LEN];
2219
        uint32_t newvolumenamelen = HEXLIFIED_SHA256_DIGEST_SIZE;
2220
        uint64_t pos = 0;
2221
        uint64_t max_objidx = calc_map_obj(map);
2222
        int r;
2223

    
2224
        for (i = 0; i < max_objidx; i++) {
2225
                mn = find_object(map, i);
2226
                if (!mn){
2227
                        XSEGLOG2(&lc, E, "Cannot find object %llu for map %s",
2228
                                        (unsigned long long) i, map->volume);
2229
                        goto out_err;
2230
                }
2231
                v0_object_to_map(mn, buf+pos);
2232
                pos += v0_objectsize_in_map;
2233
        }
2234
//        SHA256(buf, pos, sha);
2235
        merkle_hash(buf, pos, sha);
2236
        hexlify(sha, newvolumename);
2237
        strncpy(tmp_map.volume, newvolumename, newvolumenamelen);
2238
        tmp_map.volumelen = newvolumenamelen;
2239
        free(buf);
2240
        tmp_map.version = 0; // set volume version to pithos image
2241

    
2242
        /* write the map of the Snapshot */
2243
        r = write_map(pr, &tmp_map);
2244
        if (r < 0)
2245
                goto out_err;
2246
        char targetbuf[XSEG_MAX_TARGETLEN];
2247
        char *target = xseg_get_target(peer->xseg, pr->req);
2248
        strncpy(targetbuf, target, pr->req->targetlen);
2249
        r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen,
2250
                        sizeof(struct xseg_reply_snapshot));
2251
        if (r < 0){
2252
                XSEGLOG2(&lc, E, "Cannot resize request");
2253
                goto out_err;
2254
        }
2255
        target = xseg_get_target(peer->xseg, pr->req);
2256
        strncpy(target, targetbuf, pr->req->targetlen);
2257

    
2258
        struct xseg_reply_snapshot *xreply = (struct xseg_reply_snapshot *)
2259
                                                xseg_get_data(peer->xseg, pr->req);
2260
        strncpy(xreply->target, newvolumename, newvolumenamelen);
2261
        xreply->targetlen = newvolumenamelen;
2262
        map->flags &= ~MF_MAP_SNAPSHOTTING;
2263
        XSEGLOG2(&lc, I, "Snapshot for map %s completed", map->volume);
2264
        return 0;
2265

    
2266
out_err:
2267
        map->flags &= ~MF_MAP_SNAPSHOTTING;
2268
        XSEGLOG2(&lc, E, "Snapshot for map %s failed", map->volume);
2269
        return -1;
2270
}
2271

    
2272

    
2273
static int do_destroy(struct peer_req *pr, struct map *map)
2274
{
2275
        uint64_t i;
2276
        struct peerd *peer = pr->peer;
2277
        struct mapper_io *mio = __get_mapper_io(pr);
2278
        struct map_node *mn;
2279
        struct xseg_request *req;
2280

    
2281
        if (!(map->flags & MF_MAP_EXCLUSIVE))
2282
                return -1;
2283

    
2284
        XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
2285
        req = __delete_map(pr, map);
2286
        if (!req)
2287
                return -1;
2288
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
2289
        if (req->state & XS_FAILED){
2290
                xseg_put_request(peer->xseg, req, pr->portno);
2291
                map->flags &= ~MF_MAP_DELETING;
2292
                return -1;
2293
        }
2294
        xseg_put_request(peer->xseg, req, pr->portno);
2295

    
2296
        uint64_t nr_obj = calc_map_obj(map);
2297
        mio->cb = deletion_cb;
2298
        mio->del_pending = 0;
2299
        mio->err = 0;
2300
        for (i = 0; i < nr_obj; i++){
2301

    
2302
                /* throttle pending deletions
2303
                 * this should be nr_ops of the blocker, but since we don't know
2304
                 * that, we assume based on our own nr_ops
2305
                 */
2306
                wait_on_pr(pr, mio->del_pending >= peer->nr_ops);
2307

    
2308
                mn = get_mapnode(map, i);
2309
                if (!mn)
2310
                        continue;
2311
                if (mn->flags & MF_OBJECT_DESTROYED){
2312
                        put_mapnode(mn);
2313
                        continue;
2314
                }
2315
                if (!(mn->flags & MF_OBJECT_EXIST)){
2316
                        mn->flags |= MF_OBJECT_DESTROYED;
2317
                        put_mapnode(mn);
2318
                        continue;
2319
                }
2320

    
2321
                // make sure all pending operations on all objects are completed
2322
                wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
2323

    
2324
                req = __delete_object(pr, mn);
2325
                if (!req){
2326
                        mio->err = 1;
2327
                        put_mapnode(mn);
2328
                        continue;
2329
                }
2330
                mio->del_pending++;
2331
                /* do not put_mapnode here. cb does that */
2332
        }
2333

    
2334
        wait_on_pr(pr, mio->del_pending > 0);
2335

    
2336
        mio->cb = NULL;
2337
        map->flags &= ~MF_MAP_DELETING;
2338
        map->flags |= MF_MAP_DELETED;
2339
        XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
2340
        return do_close(pr, map);
2341
}
2342

    
2343
static int do_mapr(struct peer_req *pr, struct map *map)
2344
{
2345
        struct peerd *peer = pr->peer;
2346
        int r = req2objs(pr, map, 0);
2347
        if  (r < 0){
2348
                XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
2349
                                map->volume, 
2350
                                (unsigned long long) pr->req->offset, 
2351
                                (unsigned long long) (pr->req->offset + pr->req->size));
2352
                return -1;
2353
        }
2354
        XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
2355
                        map->volume, 
2356
                        (unsigned long long) pr->req->offset, 
2357
                        (unsigned long long) (pr->req->offset + pr->req->size));
2358
        XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2359
                        (unsigned long long) pr->req->offset,
2360
                        (unsigned long long) pr->req->size);
2361
        char buf[XSEG_MAX_TARGETLEN+1];
2362
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2363
        int i;
2364
        for (i = 0; i < reply->cnt; i++) {
2365
                XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2366
                strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2367
                buf[reply->segs[i].targetlen] = 0;
2368
                XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2369
                                (unsigned long long) reply->segs[i].offset,
2370
                                (unsigned long long) reply->segs[i].size);
2371
        }
2372
        return 0;
2373
}
2374

    
2375
static int do_mapw(struct peer_req *pr, struct map *map)
2376
{
2377
        struct peerd *peer = pr->peer;
2378
        int r = req2objs(pr, map, 1);
2379
        if  (r < 0){
2380
                XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
2381
                                map->volume, 
2382
                                (unsigned long long) pr->req->offset, 
2383
                                (unsigned long long) (pr->req->offset + pr->req->size));
2384
                return -1;
2385
        }
2386
        XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
2387
                        map->volume, 
2388
                        (unsigned long long) pr->req->offset, 
2389
                        (unsigned long long) (pr->req->offset + pr->req->size));
2390
        XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
2391
                        (unsigned long long) pr->req->offset,
2392
                        (unsigned long long) pr->req->size);
2393
        char buf[XSEG_MAX_TARGETLEN+1];
2394
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
2395
        int i;
2396
        for (i = 0; i < reply->cnt; i++) {
2397
                XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
2398
                strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
2399
                buf[reply->segs[i].targetlen] = 0;
2400
                XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
2401
                                (unsigned long long) reply->segs[i].offset,
2402
                                (unsigned long long) reply->segs[i].size);
2403
        }
2404
        return 0;
2405
}
2406

    
2407
//here map is the parent map
2408
static int do_clone(struct peer_req *pr, struct map *map)
2409
{
2410
        int r;
2411
        struct peerd *peer = pr->peer;
2412
        struct mapperd *mapper = __get_mapperd(peer);
2413
        char *target = xseg_get_target(peer->xseg, pr->req);
2414
        struct map *clonemap;
2415
        struct xseg_request_clone *xclone =
2416
                (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2417

    
2418
        XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
2419

    
2420
        clonemap = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2421
        if (!clonemap)
2422
                return -1;
2423

    
2424
        /* open map to get exclusive access to map */
2425
        r = open_map(pr, clonemap, 0);
2426
        if (r < 0){
2427
                XSEGLOG2(&lc, E, "Cannot open map %s", clonemap->volume);
2428
                XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2429
                goto out_err;
2430
        }
2431
        r = load_map(pr, clonemap);
2432
        if (r >= 0) {
2433
                XSEGLOG2(&lc, E, "Target volume %s exists", clonemap->volume);
2434
                goto out_err;
2435
        }
2436

    
2437
        if (xclone->size == -1)
2438
                clonemap->size = map->size;
2439
        else
2440
                clonemap->size = xclone->size;
2441
        if (clonemap->size < map->size){
2442
                XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
2443
                                "\n\t for requested clone %s",
2444
                                (unsigned long long) xclone->size,
2445
                                (unsigned long long) map->size, clonemap->volume);
2446
                goto out_err;
2447
        }
2448
        if (clonemap->size > MAX_VOLUME_SIZE) {
2449
                XSEGLOG2(&lc, E, "Requested size %llu > max volume size %llu"
2450
                                "\n\t for volume %s",
2451
                                clonemap->size, MAX_VOLUME_SIZE, clonemap->volume);
2452
                goto out_err;
2453
        }
2454

    
2455
        //alloc and init map_nodes
2456
        //unsigned long c = clonemap->size/block_size + 1;
2457
        unsigned long c = calc_map_obj(clonemap);
2458
        struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
2459
        if (!map_nodes){
2460
                goto out_err;
2461
        }
2462
        int i;
2463
        //for (i = 0; i < clonemap->size/block_size + 1; i++) {
2464
        for (i = 0; i < c; i++) {
2465
                struct map_node *mn = get_mapnode(map, i);
2466
                if (mn) {
2467
                        strncpy(map_nodes[i].object, mn->object, mn->objectlen);
2468
                        map_nodes[i].objectlen = mn->objectlen;
2469
                        put_mapnode(mn);
2470
                } else {
2471
                        strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2472
                        map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2473
                }
2474
                map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2475
                map_nodes[i].flags = 0;
2476
                map_nodes[i].objectidx = i;
2477
                map_nodes[i].map = clonemap;
2478
                map_nodes[i].ref = 1;
2479
                map_nodes[i].waiters = 0;
2480
                map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2481
                r = insert_object(clonemap, &map_nodes[i]);
2482
                if (r < 0){
2483
                        XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
2484
                        goto out_err;
2485
                }
2486
        }
2487

    
2488
        r = write_map(pr, clonemap);
2489
        if (r < 0){
2490
                XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
2491
                goto out_err;
2492
        }
2493
        do_close(pr, clonemap);
2494
        return 0;
2495

    
2496
out_err:
2497
        do_close(pr, clonemap);
2498
        return -1;
2499
}
2500

    
2501
static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
2502
{
2503
        int r, opened = 0;
2504
        if (flags & MF_EXCLUSIVE){
2505
                r = open_map(pr, map, flags);
2506
                if (r < 0) {
2507
                        if (flags & MF_FORCE){
2508
                                return -1;
2509
                        }
2510
                } else {
2511
                        opened = 1;
2512
                }
2513
        }
2514
        r = load_map(pr, map);
2515
        if (r < 0 && opened){
2516
                close_map(pr, map);
2517
        }
2518
        return r;
2519
}
2520

    
2521
struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen,
2522
                        uint32_t flags)
2523
{
2524
        int r;
2525
        struct peerd *peer = pr->peer;
2526
        struct mapperd *mapper = __get_mapperd(peer);
2527
        struct map *map = find_map_len(mapper, name, namelen, flags);
2528
        if (!map){
2529
                if (flags & MF_LOAD){
2530
                        map = create_map(mapper, name, namelen, flags);
2531
                        if (!map)
2532
                                return NULL;
2533
                        r = open_load_map(pr, map, flags);
2534
                        if (r < 0){
2535
                                do_dropcache(pr, map);
2536
                                return NULL;
2537
                        }
2538
                } else {
2539
                        return NULL;
2540
                }
2541
        } else if (map->flags & MF_MAP_DESTROYED){
2542
                return NULL;
2543
        }
2544
        __get_map(map);
2545
        return map;
2546

    
2547
}
2548

    
2549
static int map_action(int (action)(struct peer_req *pr, struct map *map),
2550
                struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
2551
{
2552
        //struct peerd *peer = pr->peer;
2553
        struct map *map;
2554
start:
2555
        map = get_map(pr, name, namelen, flags);
2556
        if (!map)
2557
                return -1;
2558
        if (map->flags & MF_MAP_NOT_READY){
2559
                wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
2560
                put_map(map);
2561
                goto start;
2562
        }
2563
        int r = action(pr, map);
2564
        //always drop cache if map not read exclusively
2565
        if (!(map->flags & MF_MAP_EXCLUSIVE))
2566
                do_dropcache(pr, map);
2567
        signal_map(map);
2568
        put_map(map);
2569
        return r;
2570
}
2571

    
2572
void * handle_info(struct peer_req *pr)
2573
{
2574
        struct peerd *peer = pr->peer;
2575
        char *target = xseg_get_target(peer->xseg, pr->req);
2576
        int r = map_action(do_info, pr, target, pr->req->targetlen,
2577
                                MF_ARCHIP|MF_LOAD);
2578
        if (r < 0)
2579
                fail(peer, pr);
2580
        else
2581
                complete(peer, pr);
2582
        ta--;
2583
        return NULL;
2584
}
2585

    
2586
void * handle_clone(struct peer_req *pr)
2587
{
2588
        int r;
2589
        struct peerd *peer = pr->peer;
2590
        struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
2591
        if (!xclone) {
2592
                r = -1;
2593
                goto out;
2594
        }
2595

    
2596
        if (xclone->targetlen){
2597
                /* if snap was defined */
2598
                //support clone only from pithos
2599
                r = map_action(do_clone, pr, xclone->target, xclone->targetlen,
2600
                                        MF_LOAD);
2601
        } else {
2602
                /* else try to create a new volume */
2603
                XSEGLOG2(&lc, I, "Creating volume");
2604
                if (!xclone->size){
2605
                        XSEGLOG2(&lc, E, "Cannot create volume. Size not specified");
2606
                        r = -1;
2607
                        goto out;
2608
                }
2609
                if (xclone->size > MAX_VOLUME_SIZE) {
2610
                        XSEGLOG2(&lc, E, "Requested size %llu > max volume "
2611
                                        "size %llu", xclone->size, MAX_VOLUME_SIZE);
2612
                        r = -1;
2613
                        goto out;
2614
                }
2615

    
2616
                struct map *map;
2617
                char *target = xseg_get_target(peer->xseg, pr->req);
2618

    
2619
                //create a new empty map of size
2620
                map = create_map(mapper, target, pr->req->targetlen, MF_ARCHIP);
2621
                if (!map){
2622
                        r = -1;
2623
                        goto out;
2624
                }
2625
                /* open map to get exclusive access to map */
2626
                r = open_map(pr, map, 0);
2627
                if (r < 0){
2628
                        XSEGLOG2(&lc, E, "Cannot open map %s", map->volume);
2629
                        XSEGLOG2(&lc, E, "Target volume %s exists", map->volume);
2630
                        do_dropcache(pr, map);
2631
                        r = -1;
2632
                        goto out;
2633
                }
2634
                r = load_map(pr, map);
2635
                if (r >= 0) {
2636
                        XSEGLOG2(&lc, E, "Map exists %s", map->volume);
2637
                        do_close(pr, map);
2638
                        r = -1;
2639
                        goto out;
2640
                }
2641
                map->size = xclone->size;
2642
                //populate_map with zero objects;
2643
                uint64_t nr_objs = xclone->size / block_size;
2644
                if (xclone->size % block_size)
2645
                        nr_objs++;
2646

    
2647
                struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
2648
                if (!map_nodes){
2649
                        do_close(pr, map);
2650
                        r = -1;
2651
                        goto out;
2652
                }
2653

    
2654
                uint64_t i;
2655
                for (i = 0; i < nr_objs; i++) {
2656
                        strncpy(map_nodes[i].object, zero_block, ZERO_BLOCK_LEN);
2657
                        map_nodes[i].objectlen = ZERO_BLOCK_LEN;
2658
                        map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
2659
                        map_nodes[i].flags = 0;
2660
                        map_nodes[i].objectidx = i;
2661
                        map_nodes[i].map = map;
2662
                        map_nodes[i].ref = 1;
2663
                        map_nodes[i].waiters = 0;
2664
                        map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
2665
                        r = insert_object(map, &map_nodes[i]);
2666
                        if (r < 0){
2667
                                do_close(pr, map);
2668
                                r = -1;
2669
                                goto out;
2670
                        }
2671
                }
2672
                r = write_map(pr, map);
2673
                if (r < 0){
2674
                        XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
2675
                        do_close(pr, map);
2676
                        goto out;
2677
                }
2678
                XSEGLOG2(&lc, I, "Volume %s created", map->volume);
2679
                r = 0;
2680
                do_close(pr, map); //drop cache here for consistency
2681
        }
2682
out:
2683
        if (r < 0)
2684
                fail(peer, pr);
2685
        else
2686
                complete(peer, pr);
2687
        ta--;
2688
        return NULL;
2689
}
2690

    
2691
void * handle_mapr(struct peer_req *pr)
2692
{
2693
        struct peerd *peer = pr->peer;
2694
        char *target = xseg_get_target(peer->xseg, pr->req);
2695
        int r = map_action(do_mapr, pr, target, pr->req->targetlen,
2696
                                MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2697
        if (r < 0)
2698
                fail(peer, pr);
2699
        else
2700
                complete(peer, pr);
2701
        ta--;
2702
        return NULL;
2703
}
2704

    
2705
void * handle_mapw(struct peer_req *pr)
2706
{
2707
        struct peerd *peer = pr->peer;
2708
        char *target = xseg_get_target(peer->xseg, pr->req);
2709
        int r = map_action(do_mapw, pr, target, pr->req->targetlen,
2710
                                MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
2711
        if (r < 0)
2712
                fail(peer, pr);
2713
        else
2714
                complete(peer, pr);
2715
        XSEGLOG2(&lc, D, "Ta: %d", ta);
2716
        ta--;
2717
        return NULL;
2718
}
2719

    
2720
void * handle_destroy(struct peer_req *pr)
2721
{
2722
        struct peerd *peer = pr->peer;
2723
        char *target = xseg_get_target(peer->xseg, pr->req);
2724
        /* request EXCLUSIVE access, but do not force it.
2725
         * check if succeeded on do_destroy
2726
         */
2727
        int r = map_action(do_destroy, pr, target, pr->req->targetlen,
2728
                                MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2729
        if (r < 0)
2730
                fail(peer, pr);
2731
        else
2732
                complete(peer, pr);
2733
        ta--;
2734
        return NULL;
2735
}
2736

    
2737
void * handle_open(struct peer_req *pr)
2738
{
2739
        struct peerd *peer = pr->peer;
2740
        char *target = xseg_get_target(peer->xseg, pr->req);
2741
        //here we do not want to load
2742
        int r = map_action(do_open, pr, target, pr->req->targetlen,
2743
                                MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2744
        if (r < 0)
2745
                fail(peer, pr);
2746
        else
2747
                complete(peer, pr);
2748
        ta--;
2749
        return NULL;
2750
}
2751

    
2752
void * handle_close(struct peer_req *pr)
2753
{
2754
        struct peerd *peer = pr->peer;
2755
        char *target = xseg_get_target(peer->xseg, pr->req);
2756
        //here we do not want to load
2757
        int r = map_action(do_close, pr, target, pr->req->targetlen,
2758
                                MF_ARCHIP|MF_EXCLUSIVE|MF_FORCE);
2759
        if (r < 0)
2760
                fail(peer, pr);
2761
        else
2762
                complete(peer, pr);
2763
        ta--;
2764
        return NULL;
2765
}
2766

    
2767
void * handle_snapshot(struct peer_req *pr)
2768
{
2769
        struct peerd *peer = pr->peer;
2770
        char *target = xseg_get_target(peer->xseg, pr->req);
2771
        /* request EXCLUSIVE access, but do not force it.
2772
         * check if succeeded on do_snapshot
2773
         */
2774
        int r = map_action(do_snapshot, pr, target, pr->req->targetlen,
2775
                                MF_ARCHIP|MF_LOAD|MF_EXCLUSIVE);
2776
        if (r < 0)
2777
                fail(peer, pr);
2778
        else
2779
                complete(peer, pr);
2780
        ta--;
2781
        return NULL;
2782
}
2783

    
2784
int dispatch_accepted(struct peerd *peer, struct peer_req *pr,
2785
                        struct xseg_request *req)
2786
{
2787
        //struct mapperd *mapper = __get_mapperd(peer);
2788
        struct mapper_io *mio = __get_mapper_io(pr);
2789
        void *(*action)(struct peer_req *) = NULL;
2790

    
2791
        mio->state = ACCEPTED;
2792
        mio->err = 0;
2793
        mio->cb = NULL;
2794
        switch (pr->req->op) {
2795
                /* primary xseg operations of mapper */
2796
                case X_CLONE: action = handle_clone; break;
2797
                case X_MAPR: action = handle_mapr; break;
2798
                case X_MAPW: action = handle_mapw; break;
2799
                case X_SNAPSHOT: action = handle_snapshot; break;
2800
                case X_INFO: action = handle_info; break;
2801
                case X_DELETE: action = handle_destroy; break;
2802
                case X_OPEN: action = handle_open; break;
2803
                case X_CLOSE: action = handle_close; break;
2804
                default: fprintf(stderr, "mydispatch: unknown up\n"); break;
2805
        }
2806
        if (action){
2807
                ta++;
2808
                mio->active = 1;
2809
                st_thread_create(action, pr, 0, 0);
2810
        }
2811
        return 0;
2812

    
2813
}
2814

    
2815
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
2816
                enum dispatch_reason reason)
2817
{
2818
        struct mapperd *mapper = __get_mapperd(peer);
2819
        (void) mapper;
2820
        struct mapper_io *mio = __get_mapper_io(pr);
2821
        (void) mio;
2822

    
2823

    
2824
        if (reason == dispatch_accept)
2825
                dispatch_accepted(peer, pr, req);
2826
        else {
2827
                if (mio->cb){
2828
                        mio->cb(pr, req);
2829
                } else { 
2830
                        signal_pr(pr);
2831
                }
2832
        }
2833
        return 0;
2834
}
2835

    
2836
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
2837
{
2838
        int i;
2839

    
2840
        //FIXME error checks
2841
        struct mapperd *mapperd = malloc(sizeof(struct mapperd));
2842
        peer->priv = mapperd;
2843
        mapper = mapperd;
2844
        mapper->hashmaps = xhash_new(3, 0, STRING);
2845

    
2846
        for (i = 0; i < peer->nr_ops; i++) {
2847
                struct mapper_io *mio = malloc(sizeof(struct mapper_io));
2848
                mio->copyups_nodes = xhash_new(3, 0, INTEGER);
2849
                mio->copyups = 0;
2850
                mio->err = 0;
2851
                mio->active = 0;
2852
                peer->peer_reqs[i].priv = mio;
2853
        }
2854

    
2855
        mapper->bportno = -1;
2856
        mapper->mbportno = -1;
2857
        BEGIN_READ_ARGS(argc, argv);
2858
        READ_ARG_ULONG("-bp", mapper->bportno);
2859
        READ_ARG_ULONG("-mbp", mapper->mbportno);
2860
        END_READ_ARGS();
2861
        if (mapper->bportno == -1){
2862
                XSEGLOG2(&lc, E, "Portno for blocker must be provided");
2863
                usage(argv[0]);
2864
                return -1;
2865
        }
2866
        if (mapper->mbportno == -1){
2867
                XSEGLOG2(&lc, E, "Portno for mblocker must be provided");
2868
                usage(argv[0]);
2869
                return -1;
2870
        }
2871

    
2872
        const struct sched_param param = { .sched_priority = 99 };
2873
        sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
2874
        /* FIXME maybe place it in peer
2875
         * should be done for each port (sportno to eportno)
2876
         */
2877
        xseg_set_max_requests(peer->xseg, peer->portno_start, 5000);
2878
        xseg_set_freequeue_size(peer->xseg, peer->portno_start, 3000, 0);
2879

    
2880

    
2881
//        test_map(peer);
2882

    
2883
        return 0;
2884
}
2885

    
2886
/* FIXME this should not be here */
2887
int wait_reply(struct peerd *peer, struct xseg_request *expected_req)
2888
{
2889
        struct xseg *xseg = peer->xseg;
2890
        xport portno_start = peer->portno_start;
2891
        xport portno_end = peer->portno_end;
2892
        struct peer_req *pr;
2893
        xport i;
2894
        int  r, c = 0;
2895
        struct xseg_request *received;
2896
        xseg_prepare_wait(xseg, portno_start);
2897
        while(1) {
2898
                XSEGLOG2(&lc, D, "Attempting to check for reply");
2899
                c = 1;
2900
                while (c){
2901
                        c = 0;
2902
                        for (i = portno_start; i <= portno_end; i++) {
2903
                                received = xseg_receive(xseg, i, 0);
2904
                                if (received) {
2905
                                        c = 1;
2906
                                        r =  xseg_get_req_data(xseg, received, (void **) &pr);
2907
                                        if (r < 0 || !pr || received != expected_req){
2908
                                                XSEGLOG2(&lc, W, "Received request with no pr data\n");
2909
                                                xport p = xseg_respond(peer->xseg, received, peer->portno_start, X_ALLOC);
2910
                                                if (p == NoPort){
2911
                                                        XSEGLOG2(&lc, W, "Could not respond stale request");
2912
                                                        xseg_put_request(xseg, received, portno_start);
2913
                                                        continue;
2914
                                                } else {
2915
                                                        xseg_signal(xseg, p);
2916
                                                }
2917
                                        } else {
2918
                                                xseg_cancel_wait(xseg, portno_start);
2919
                                                return 0;
2920
                                        }
2921
                                }
2922
                        }
2923
                }
2924
                xseg_wait_signal(xseg, 1000000UL);
2925
        }
2926
}
2927

    
2928

    
2929
void custom_peer_finalize(struct peerd *peer)
2930
{
2931
        struct mapperd *mapper = __get_mapperd(peer);
2932
        struct peer_req *pr = alloc_peer_req(peer);
2933
        if (!pr){
2934
                XSEGLOG2(&lc, E, "Cannot get peer request");
2935
                return;
2936
        }
2937
        struct map *map;
2938
        struct xseg_request *req;
2939
        xhash_iter_t it;
2940
        xhashidx key, val;
2941
        xhash_iter_init(mapper->hashmaps, &it);
2942
        while (xhash_iterate(mapper->hashmaps, &it, &key, &val)){
2943
                map = (struct map *)val;
2944
                if (!(map->flags & MF_MAP_EXCLUSIVE))
2945
                        continue;
2946
                req = __close_map(pr, map);
2947
                if (!req)
2948
                        continue;
2949
                wait_reply(peer, req);
2950
                if (!(req->state & XS_SERVED))
2951
                        XSEGLOG2(&lc, E, "Couldn't close map %s", map->volume);
2952
                map->flags &= ~MF_MAP_CLOSING;
2953
                xseg_put_request(peer->xseg, req, pr->portno);
2954
        }
2955
        return;
2956

    
2957

    
2958
}
2959

    
2960
void print_obj(struct map_node *mn)
2961
{
2962
        fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2963
                        (unsigned long long) mn->objectidx, mn->object, 
2964
                        (unsigned int) mn->objectlen, 
2965
                        (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2966
}
2967

    
2968
void print_map(struct map *m)
2969
{
2970
        uint64_t nr_objs = m->size/block_size;
2971
        if (m->size % block_size)
2972
                nr_objs++;
2973
        fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu, version: %u\n", 
2974
                        m->volume, m->volumelen, 
2975
                        (unsigned long long) m->size, 
2976
                        (unsigned long long) nr_objs,
2977
                        m->version);
2978
        uint64_t i;
2979
        struct map_node *mn;
2980
        if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2981
                return;
2982
        for (i = 0; i < nr_objs; i++) {
2983
                mn = find_object(m, i);
2984
                if (!mn){
2985
                        printf("object idx [%llu] not found!\n", (unsigned long long) i);
2986
                        continue;
2987
                }
2988
                print_obj(mn);
2989
        }
2990
}
2991

    
2992
/*
2993
void test_map(struct peerd *peer)
2994
{
2995
        int i,j, ret;
2996
        //struct sha256_ctx sha256ctx;
2997
        unsigned char buf[SHA256_DIGEST_SIZE];
2998
        char buf_new[XSEG_MAX_TARGETLEN + 20];
2999
        struct map *m = malloc(sizeof(struct map));
3000
        strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
3001
        m->volume[XSEG_MAX_TARGETLEN] = 0;
3002
        strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
3003
        buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
3004
        m->volumelen = XSEG_MAX_TARGETLEN;
3005
        m->size = 100*block_size;
3006
        m->objects = xhash_new(3, INTEGER);
3007
        struct map_node *map_node = calloc(100, sizeof(struct map_node));
3008
        for (i = 0; i < 100; i++) {
3009
                sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
3010
                gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
3011
                
3012
                for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
3013
                        sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
3014
                }
3015
                map_node[i].objectidx = i;
3016
                map_node[i].objectlen = XSEG_MAX_TARGETLEN;
3017
                map_node[i].flags = MF_OBJECT_EXIST;
3018
                ret = insert_object(m, &map_node[i]);
3019
        }
3020

3021
        char *data = malloc(block_size);
3022
        mapheader_to_map(m, data);
3023
        uint64_t pos = mapheader_size;
3024

3025
        for (i = 0; i < 100; i++) {
3026
                map_node = find_object(m, i);
3027
                if (!map_node){
3028
                        printf("no object node %d \n", i);
3029
                        exit(1);
3030
                }
3031
                object_to_map(data+pos, map_node);
3032
                pos += objectsize_in_map;
3033
        }
3034
//        print_map(m);
3035

3036
        struct map *m2 = malloc(sizeof(struct map));
3037
        strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
3038
        m->volume[XSEG_MAX_TARGETLEN] = 0;
3039
        m->volumelen = XSEG_MAX_TARGETLEN;
3040

3041
        m2->objects = xhash_new(3, INTEGER);
3042
        ret = read_map(peer, m2, data);
3043
//        print_map(m2);
3044

3045
        int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
3046
        ssize_t r, sum = 0;
3047
        while (sum < block_size) {
3048
                r = write(fd, data + sum, block_size -sum);
3049
                if (r < 0){
3050
                        perror("write");
3051
                        printf("write error\n");
3052
                        exit(1);
3053
                } 
3054
                sum += r;
3055
        }
3056
        close(fd);
3057
        map_node = find_object(m, 0);
3058
        free(map_node);
3059
        free(m);
3060
}
3061
*/