Statistics
| Branch: | Tag: | Revision:

root / xseg / peers / user / mt-mapperd.c @ 2979e589

History | View | Annotate | Download (55.9 kB)

1 7c130c7d Filippos Giannakos
#include <stdio.h>
2 7c130c7d Filippos Giannakos
#include <unistd.h>
3 7c130c7d Filippos Giannakos
#include <sys/types.h>
4 7c130c7d Filippos Giannakos
#include <pthread.h>
5 7c130c7d Filippos Giannakos
#include <xseg/xseg.h>
6 dabec8ad Giannakos Filippos
#include <peer.h>
7 7c130c7d Filippos Giannakos
#include <time.h>
8 7c130c7d Filippos Giannakos
#include <xtypes/xlock.h>
9 7c130c7d Filippos Giannakos
#include <xtypes/xhash.h>
10 7c130c7d Filippos Giannakos
#include <xseg/protocol.h>
11 5ad213f0 Filippos Giannakos
#include <sys/stat.h>
12 5ad213f0 Filippos Giannakos
#include <fcntl.h>
13 6805c197 Giannakos Filippos
#include <gcrypt.h>
14 6805c197 Giannakos Filippos
#include <errno.h>
15 b911a183 Giannakos Filippos
#include <sched.h>
16 b911a183 Giannakos Filippos
#include <sys/syscall.h>
17 6805c197 Giannakos Filippos
18 6805c197 Giannakos Filippos
GCRY_THREAD_OPTION_PTHREAD_IMPL;
19 9d3f16da Giannakos Filippos
#define MF_LOAD         (1 << 0)
20 9d3f16da Giannakos Filippos
#define MF_EXCLUSIVE         (1 << 1)
21 9d3f16da Giannakos Filippos
#define MF_FORCE         (1 << 2)
22 7c130c7d Filippos Giannakos
23 6805c197 Giannakos Filippos
#define SHA256_DIGEST_SIZE 32
24 40e56a42 Filippos Giannakos
/* hex representation of sha256 value takes up double the sha256 size */
25 65062a91 Filippos Giannakos
#define HEXLIFIED_SHA256_DIGEST_SIZE (SHA256_DIGEST_SIZE << 1)
26 65062a91 Filippos Giannakos
27 65062a91 Filippos Giannakos
#define block_size (1<<22) //FIXME this should be defined here?
28 65062a91 Filippos Giannakos
#define objectsize_in_map (1 + XSEG_MAX_TARGETLEN) /* transparency byte + max object len */
29 40e56a42 Filippos Giannakos
#define mapheader_size (SHA256_DIGEST_SIZE + (sizeof(uint64_t)) ) /* magic hash value  + volume size */
30 7c130c7d Filippos Giannakos
31 7c130c7d Filippos Giannakos
#define MF_OBJECT_EXIST                (1 << 0)
32 7c130c7d Filippos Giannakos
#define MF_OBJECT_COPYING        (1 << 1)
33 d85c1e11 Filippos Giannakos
#define MF_OBJECT_WRITING        (1 << 2)
34 c04d4786 Filippos Giannakos
#define MF_OBJECT_DELETING        (1 << 3)
35 9d3f16da Giannakos Filippos
#define MF_OBJECT_DELETED        (1 << 4)
36 9d3f16da Giannakos Filippos
#define MF_OBJECT_DESTROYED        (1 << 5)
37 d85c1e11 Filippos Giannakos
38 c04d4786 Filippos Giannakos
#define MF_OBJECT_NOT_READY        (MF_OBJECT_COPYING|MF_OBJECT_WRITING|MF_OBJECT_DELETING)
39 7c130c7d Filippos Giannakos
40 7c130c7d Filippos Giannakos
char *magic_string = "This a magic string. Please hash me";
41 40e56a42 Filippos Giannakos
unsigned char magic_sha256[SHA256_DIGEST_SIZE];        /* sha256 hash value of magic string */
42 65062a91 Filippos Giannakos
char zero_block[HEXLIFIED_SHA256_DIGEST_SIZE + 1]; /* hexlified sha256 hash value of a block full of zeros */
43 7c130c7d Filippos Giannakos
44 b0bcccba Giannakos Filippos
//dispatch_internal mapper states
45 d85c1e11 Filippos Giannakos
enum mapper_state {
46 d85c1e11 Filippos Giannakos
        ACCEPTED = 0,
47 d85c1e11 Filippos Giannakos
        WRITING = 1,
48 c04d4786 Filippos Giannakos
        COPYING = 2,
49 9f27419f Giannakos Filippos
        DELETING = 3,
50 9f27419f Giannakos Filippos
        DROPPING_CACHE = 4
51 d85c1e11 Filippos Giannakos
};
52 d85c1e11 Filippos Giannakos
53 9d3f16da Giannakos Filippos
typedef void (*cb_t)(struct peer_req *pr, struct xseg_request *req);
54 9d3f16da Giannakos Filippos
55 7c130c7d Filippos Giannakos
struct map_node {
56 7c130c7d Filippos Giannakos
        uint32_t flags;
57 7c130c7d Filippos Giannakos
        uint32_t objectidx;
58 40e56a42 Filippos Giannakos
        uint32_t objectlen;
59 65062a91 Filippos Giannakos
        char object[XSEG_MAX_TARGETLEN + 1];         /* NULL terminated string */
60 d85c1e11 Filippos Giannakos
        struct map *map;
61 9d3f16da Giannakos Filippos
        uint32_t ref;
62 9d3f16da Giannakos Filippos
        uint32_t waiters;
63 9d3f16da Giannakos Filippos
        st_cond_t cond;
64 7c130c7d Filippos Giannakos
};
65 7c130c7d Filippos Giannakos
66 40e56a42 Filippos Giannakos
#define MF_MAP_LOADING                (1 << 0)
67 40e56a42 Filippos Giannakos
#define MF_MAP_DESTROYED        (1 << 1)
68 d85c1e11 Filippos Giannakos
#define MF_MAP_WRITING                (1 << 2)
69 c04d4786 Filippos Giannakos
#define MF_MAP_DELETING                (1 << 3)
70 9f27419f Giannakos Filippos
#define MF_MAP_DROPPING_CACHE        (1 << 4)
71 9d3f16da Giannakos Filippos
#define MF_MAP_EXCLUSIVE        (1 << 5)
72 9d3f16da Giannakos Filippos
#define MF_MAP_OPENING                (1 << 6)
73 9d3f16da Giannakos Filippos
#define MF_MAP_CLOSING                (1 << 7)
74 d85c1e11 Filippos Giannakos
75 9f27419f Giannakos Filippos
#define MF_MAP_NOT_READY        (MF_MAP_LOADING|MF_MAP_WRITING|MF_MAP_DELETING|\
76 9d3f16da Giannakos Filippos
                                        MF_MAP_DROPPING_CACHE|MF_MAP_OPENING)
77 9d3f16da Giannakos Filippos
78 9d3f16da Giannakos Filippos
#define wait_on_pr(__pr, __condition__)         \
79 9d3f16da Giannakos Filippos
        while (__condition__){                        \
80 9d3f16da Giannakos Filippos
                ta--;                                \
81 9d3f16da Giannakos Filippos
                __get_mapper_io(pr)->active = 0;\
82 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, D, "Waiting on pr %lx, ta: %u",  pr, ta); \
83 9d3f16da Giannakos Filippos
                st_cond_wait(__pr->cond);        \
84 9d3f16da Giannakos Filippos
        }
85 9d3f16da Giannakos Filippos
86 9d3f16da Giannakos Filippos
#define wait_on_mapnode(__mn, __condition__)        \
87 9d3f16da Giannakos Filippos
        while (__condition__){                        \
88 9d3f16da Giannakos Filippos
                ta--;                                \
89 9d3f16da Giannakos Filippos
                __mn->waiters++;                \
90 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, D, "Waiting on map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
91 9d3f16da Giannakos Filippos
                st_cond_wait(__mn->cond);        \
92 9d3f16da Giannakos Filippos
        }
93 9d3f16da Giannakos Filippos
94 9d3f16da Giannakos Filippos
#define wait_on_map(__map, __condition__)        \
95 9d3f16da Giannakos Filippos
        while (__condition__){                        \
96 9d3f16da Giannakos Filippos
                ta--;                                \
97 9d3f16da Giannakos Filippos
                __map->waiters++;                \
98 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, D, "Waiting on map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
99 9d3f16da Giannakos Filippos
                st_cond_wait(__map->cond);        \
100 9d3f16da Giannakos Filippos
        }
101 9d3f16da Giannakos Filippos
102 9d3f16da Giannakos Filippos
#define signal_pr(__pr)                                \
103 9d3f16da Giannakos Filippos
        do {                                         \
104 9d3f16da Giannakos Filippos
                if (!__get_mapper_io(pr)->active){\
105 9d3f16da Giannakos Filippos
                        ta++;                        \
106 9d3f16da Giannakos Filippos
                        XSEGLOG2(&lc, D, "Signaling  pr %lx, ta: %u",  pr, ta); \
107 9d3f16da Giannakos Filippos
                        __get_mapper_io(pr)->active = 1;\
108 9d3f16da Giannakos Filippos
                        st_cond_signal(__pr->cond);        \
109 9d3f16da Giannakos Filippos
                }                                \
110 9d3f16da Giannakos Filippos
        }while(0)
111 9d3f16da Giannakos Filippos
112 9d3f16da Giannakos Filippos
#define signal_map(__map)                        \
113 9d3f16da Giannakos Filippos
        do {                                         \
114 9d3f16da Giannakos Filippos
                if (__map->waiters) {                \
115 9d3f16da Giannakos Filippos
                        ta += 1;                \
116 9d3f16da Giannakos Filippos
                        XSEGLOG2(&lc, D, "Signaling map %lx %s, waiters: %u, ta: %u",  __map, __map->volume, __map->waiters, ta); \
117 9d3f16da Giannakos Filippos
                        __map->waiters--;        \
118 9d3f16da Giannakos Filippos
                        st_cond_signal(__map->cond);        \
119 9d3f16da Giannakos Filippos
                }                                \
120 9d3f16da Giannakos Filippos
        }while(0)
121 9d3f16da Giannakos Filippos
122 9d3f16da Giannakos Filippos
#define signal_mapnode(__mn)                        \
123 9d3f16da Giannakos Filippos
        do {                                         \
124 9d3f16da Giannakos Filippos
                if (__mn->waiters) {                \
125 9d3f16da Giannakos Filippos
                        ta += __mn->waiters;        \
126 9d3f16da Giannakos Filippos
                        XSEGLOG2(&lc, D, "Signaling map node %lx %s, waiters: %u, ta: %u",  __mn, __mn->object, __mn->waiters, ta); \
127 9d3f16da Giannakos Filippos
                        __mn->waiters = 0;        \
128 9d3f16da Giannakos Filippos
                        st_cond_broadcast(__mn->cond);        \
129 9d3f16da Giannakos Filippos
                }                                \
130 9d3f16da Giannakos Filippos
        }while(0)
131 9d3f16da Giannakos Filippos
132 7c130c7d Filippos Giannakos
133 7c130c7d Filippos Giannakos
struct map {
134 7c130c7d Filippos Giannakos
        uint32_t flags;
135 7c130c7d Filippos Giannakos
        uint64_t size;
136 7c130c7d Filippos Giannakos
        uint32_t volumelen;
137 65062a91 Filippos Giannakos
        char volume[XSEG_MAX_TARGETLEN + 1]; /* NULL terminated string */
138 40e56a42 Filippos Giannakos
        xhash_t *objects;         /* obj_index --> map_node */
139 9d3f16da Giannakos Filippos
        uint32_t ref;
140 9d3f16da Giannakos Filippos
        uint32_t waiters;
141 9d3f16da Giannakos Filippos
        st_cond_t cond;
142 7c130c7d Filippos Giannakos
};
143 7c130c7d Filippos Giannakos
144 7c130c7d Filippos Giannakos
struct mapperd {
145 e155e2f9 Giannakos Filippos
        xport bportno;                /* blocker that accesses data */
146 e155e2f9 Giannakos Filippos
        xport mbportno;                /* blocker that accesses maps */
147 7c130c7d Filippos Giannakos
        xhash_t *hashmaps; // hash_function(target) --> struct map
148 7c130c7d Filippos Giannakos
};
149 7c130c7d Filippos Giannakos
150 7c130c7d Filippos Giannakos
struct mapper_io {
151 40e56a42 Filippos Giannakos
        volatile uint32_t copyups;        /* nr of copyups pending, issued by this mapper io */
152 40e56a42 Filippos Giannakos
        xhash_t *copyups_nodes;                /* hash map (xseg_request) --> (corresponding map_node of copied up object)*/
153 d85c1e11 Filippos Giannakos
        struct map_node *copyup_node;
154 9d3f16da Giannakos Filippos
        volatile int err;                        /* error flag */
155 9d3f16da Giannakos Filippos
        volatile uint64_t del_pending;
156 c04d4786 Filippos Giannakos
        uint64_t delobj;
157 9f27419f Giannakos Filippos
        uint64_t dcobj;
158 9d3f16da Giannakos Filippos
        cb_t cb;
159 d85c1e11 Filippos Giannakos
        enum mapper_state state;
160 9d3f16da Giannakos Filippos
        volatile int active;
161 7c130c7d Filippos Giannakos
};
162 7c130c7d Filippos Giannakos
163 9d3f16da Giannakos Filippos
/* global vars */
164 9d3f16da Giannakos Filippos
struct mapperd *mapper;
165 9d3f16da Giannakos Filippos
166 ca18b3c1 Filippos Giannakos
void print_map(struct map *m);
167 ca18b3c1 Filippos Giannakos
168 40e56a42 Filippos Giannakos
/*
169 40e56a42 Filippos Giannakos
 * Helper functions
170 40e56a42 Filippos Giannakos
 */
171 40e56a42 Filippos Giannakos
172 7c130c7d Filippos Giannakos
static inline struct mapperd * __get_mapperd(struct peerd *peer)
173 7c130c7d Filippos Giannakos
{
174 7c130c7d Filippos Giannakos
        return (struct mapperd *) peer->priv;
175 7c130c7d Filippos Giannakos
}
176 7c130c7d Filippos Giannakos
177 7c130c7d Filippos Giannakos
static inline struct mapper_io * __get_mapper_io(struct peer_req *pr)
178 7c130c7d Filippos Giannakos
{
179 7c130c7d Filippos Giannakos
        return (struct mapper_io *) pr->priv;
180 7c130c7d Filippos Giannakos
}
181 7c130c7d Filippos Giannakos
182 40e56a42 Filippos Giannakos
static inline uint64_t calc_map_obj(struct map *map)
183 40e56a42 Filippos Giannakos
{
184 9d3f16da Giannakos Filippos
        if (map->size == -1)
185 9d3f16da Giannakos Filippos
                return 0;
186 40e56a42 Filippos Giannakos
        uint64_t nr_objs = map->size / block_size;
187 40e56a42 Filippos Giannakos
        if (map->size % block_size)
188 40e56a42 Filippos Giannakos
                nr_objs++;
189 40e56a42 Filippos Giannakos
        return nr_objs;
190 40e56a42 Filippos Giannakos
}
191 40e56a42 Filippos Giannakos
192 40e56a42 Filippos Giannakos
static uint32_t calc_nr_obj(struct xseg_request *req)
193 40e56a42 Filippos Giannakos
{
194 40e56a42 Filippos Giannakos
        unsigned int r = 1;
195 40e56a42 Filippos Giannakos
        uint64_t rem_size = req->size;
196 40e56a42 Filippos Giannakos
        uint64_t obj_offset = req->offset & (block_size -1); //modulo
197 d85c1e11 Filippos Giannakos
        uint64_t obj_size =  (rem_size + obj_offset > block_size) ? block_size - obj_offset : rem_size;
198 40e56a42 Filippos Giannakos
        rem_size -= obj_size;
199 40e56a42 Filippos Giannakos
        while (rem_size > 0) {
200 d85c1e11 Filippos Giannakos
                obj_size = (rem_size > block_size) ? block_size : rem_size;
201 40e56a42 Filippos Giannakos
                rem_size -= obj_size;
202 40e56a42 Filippos Giannakos
                r++;
203 40e56a42 Filippos Giannakos
        }
204 40e56a42 Filippos Giannakos
205 40e56a42 Filippos Giannakos
        return r;
206 40e56a42 Filippos Giannakos
}
207 40e56a42 Filippos Giannakos
208 40e56a42 Filippos Giannakos
/*
209 40e56a42 Filippos Giannakos
 * Maps handling functions
210 40e56a42 Filippos Giannakos
 */
211 11d6a7ff Giannakos Filippos
//FIXME assert targetlen > 0
212 11d6a7ff Giannakos Filippos
213 40e56a42 Filippos Giannakos
214 40e56a42 Filippos Giannakos
static struct map * find_map(struct mapperd *mapper, char *target, uint32_t targetlen)
215 7c130c7d Filippos Giannakos
{
216 7c130c7d Filippos Giannakos
        int r;
217 5ad213f0 Filippos Giannakos
        struct map *m = NULL;
218 65062a91 Filippos Giannakos
        char buf[XSEG_MAX_TARGETLEN+1];
219 65062a91 Filippos Giannakos
        //assert targetlen <= XSEG_MAX_TARGETLEN
220 7c130c7d Filippos Giannakos
        strncpy(buf, target, targetlen);
221 7c130c7d Filippos Giannakos
        buf[targetlen] = 0;
222 9674f63d Giannakos Filippos
        XSEGLOG2(&lc, D, "looking up map %s, len %u", buf, targetlen);
223 7c130c7d Filippos Giannakos
        r = xhash_lookup(mapper->hashmaps, (xhashidx) buf, (xhashidx *) &m);
224 5ad213f0 Filippos Giannakos
        if (r < 0)
225 7c130c7d Filippos Giannakos
                return NULL;
226 7c130c7d Filippos Giannakos
        return m;
227 7c130c7d Filippos Giannakos
}
228 7c130c7d Filippos Giannakos
229 7c130c7d Filippos Giannakos
230 7c130c7d Filippos Giannakos
static int insert_map(struct mapperd *mapper, struct map *map)
231 7c130c7d Filippos Giannakos
{
232 7c130c7d Filippos Giannakos
        int r = -1;
233 7c130c7d Filippos Giannakos
        
234 40e56a42 Filippos Giannakos
        if (find_map(mapper, map->volume, map->volumelen)){
235 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, W, "Map %s found in hash maps", map->volume);
236 7c130c7d Filippos Giannakos
                goto out;
237 5ad213f0 Filippos Giannakos
        }
238 6d417535 Giannakos Filippos
239 9674f63d Giannakos Filippos
        XSEGLOG2(&lc, D, "Inserting map %s, len: %d (map: %lx)", 
240 6af5c5c6 Giannakos Filippos
                        map->volume, strlen(map->volume), (unsigned long) map);
241 7c130c7d Filippos Giannakos
        r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
242 6af5c5c6 Giannakos Filippos
        while (r == -XHASH_ERESIZE) {
243 6af5c5c6 Giannakos Filippos
                xhashidx shift = xhash_grow_size_shift(mapper->hashmaps);
244 7c130c7d Filippos Giannakos
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
245 feeaaf17 Filippos Giannakos
                if (!new_hashmap){
246 feeaaf17 Filippos Giannakos
                        XSEGLOG2(&lc, E, "Cannot grow mapper->hashmaps to sizeshift %llu",
247 feeaaf17 Filippos Giannakos
                                        (unsigned long long) shift);
248 7c130c7d Filippos Giannakos
                        goto out;
249 feeaaf17 Filippos Giannakos
                }
250 7c130c7d Filippos Giannakos
                mapper->hashmaps = new_hashmap;
251 7c130c7d Filippos Giannakos
                r = xhash_insert(mapper->hashmaps, (xhashidx) map->volume, (xhashidx) map);
252 7c130c7d Filippos Giannakos
        }
253 7c130c7d Filippos Giannakos
out:
254 7c130c7d Filippos Giannakos
        return r;
255 7c130c7d Filippos Giannakos
}
256 7c130c7d Filippos Giannakos
257 7c130c7d Filippos Giannakos
static int remove_map(struct mapperd *mapper, struct map *map)
258 7c130c7d Filippos Giannakos
{
259 7c130c7d Filippos Giannakos
        int r = -1;
260 7c130c7d Filippos Giannakos
        
261 7c130c7d Filippos Giannakos
        //assert no pending pr on map
262 7c130c7d Filippos Giannakos
        
263 7c130c7d Filippos Giannakos
        r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
264 6af5c5c6 Giannakos Filippos
        while (r == -XHASH_ERESIZE) {
265 6af5c5c6 Giannakos Filippos
                xhashidx shift = xhash_shrink_size_shift(mapper->hashmaps);
266 7c130c7d Filippos Giannakos
                xhash_t *new_hashmap = xhash_resize(mapper->hashmaps, shift, NULL);
267 feeaaf17 Filippos Giannakos
                if (!new_hashmap){
268 feeaaf17 Filippos Giannakos
                        XSEGLOG2(&lc, E, "Cannot shrink mapper->hashmaps to sizeshift %llu",
269 feeaaf17 Filippos Giannakos
                                        (unsigned long long) shift);
270 7c130c7d Filippos Giannakos
                        goto out;
271 feeaaf17 Filippos Giannakos
                }
272 7c130c7d Filippos Giannakos
                mapper->hashmaps = new_hashmap;
273 7c130c7d Filippos Giannakos
                r = xhash_delete(mapper->hashmaps, (xhashidx) map->volume);
274 7c130c7d Filippos Giannakos
        }
275 7c130c7d Filippos Giannakos
out:
276 7c130c7d Filippos Giannakos
        return r;
277 7c130c7d Filippos Giannakos
}
278 7c130c7d Filippos Giannakos
279 9d3f16da Giannakos Filippos
static struct xseg_request * __close_map(struct peer_req *pr, struct map *map)
280 7c130c7d Filippos Giannakos
{
281 7c130c7d Filippos Giannakos
        int r;
282 7c130c7d Filippos Giannakos
        xport p;
283 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
284 7c130c7d Filippos Giannakos
        struct xseg_request *req;
285 5ad213f0 Filippos Giannakos
        struct mapperd *mapper = __get_mapperd(peer);
286 7c130c7d Filippos Giannakos
        void *dummy;
287 7c130c7d Filippos Giannakos
288 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, I, "Closing map %s", map->volume);
289 7c130c7d Filippos Giannakos
290 cc21de66 Giannakos Filippos
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
291 feeaaf17 Filippos Giannakos
        if (!req){
292 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
293 9d3f16da Giannakos Filippos
                                map->volume);
294 9d3f16da Giannakos Filippos
                goto out_err;
295 feeaaf17 Filippos Giannakos
        }
296 7c130c7d Filippos Giannakos
297 9d3f16da Giannakos Filippos
        r = xseg_prep_request(peer->xseg, req, map->volumelen, block_size);
298 feeaaf17 Filippos Giannakos
        if (r < 0){
299 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
300 9d3f16da Giannakos Filippos
                                map->volume);
301 7c130c7d Filippos Giannakos
                goto out_put;
302 feeaaf17 Filippos Giannakos
        }
303 7c130c7d Filippos Giannakos
304 7c130c7d Filippos Giannakos
        char *reqtarget = xseg_get_target(peer->xseg, req);
305 7c130c7d Filippos Giannakos
        if (!reqtarget)
306 7c130c7d Filippos Giannakos
                goto out_put;
307 9d3f16da Giannakos Filippos
        strncpy(reqtarget, map->volume, req->targetlen);
308 9d3f16da Giannakos Filippos
        req->op = X_CLOSE;
309 7c130c7d Filippos Giannakos
        req->size = block_size;
310 7c130c7d Filippos Giannakos
        req->offset = 0;
311 7c130c7d Filippos Giannakos
        r = xseg_set_req_data(peer->xseg, req, pr);
312 feeaaf17 Filippos Giannakos
        if (r < 0){
313 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
314 9d3f16da Giannakos Filippos
                                map->volume);
315 7c130c7d Filippos Giannakos
                goto out_put;
316 feeaaf17 Filippos Giannakos
        }
317 cc21de66 Giannakos Filippos
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
318 feeaaf17 Filippos Giannakos
        if (p == NoPort){ 
319 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
320 9d3f16da Giannakos Filippos
                                map->volume);
321 7c130c7d Filippos Giannakos
                goto out_unset;
322 feeaaf17 Filippos Giannakos
        }
323 7c130c7d Filippos Giannakos
        r = xseg_signal(peer->xseg, p);
324 7c130c7d Filippos Giannakos
        
325 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, I, "Map %s closing", map->volume);
326 9d3f16da Giannakos Filippos
        return req;
327 7c130c7d Filippos Giannakos
328 7c130c7d Filippos Giannakos
out_unset:
329 7c130c7d Filippos Giannakos
        xseg_get_req_data(peer->xseg, req, &dummy);
330 7c130c7d Filippos Giannakos
out_put:
331 cc21de66 Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
332 7c130c7d Filippos Giannakos
out_err:
333 9d3f16da Giannakos Filippos
        return NULL;
334 9d3f16da Giannakos Filippos
}
335 7c130c7d Filippos Giannakos
336 9d3f16da Giannakos Filippos
static int close_map(struct peer_req *pr, struct map *map)
337 9d3f16da Giannakos Filippos
{
338 9d3f16da Giannakos Filippos
        int err;
339 9d3f16da Giannakos Filippos
        struct xseg_request *req;
340 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
341 9d3f16da Giannakos Filippos
342 9d3f16da Giannakos Filippos
        map->flags |= MF_MAP_CLOSING;
343 9d3f16da Giannakos Filippos
        req = __close_map(pr, map);
344 9d3f16da Giannakos Filippos
        if (!req)
345 9d3f16da Giannakos Filippos
                return -1;
346 9d3f16da Giannakos Filippos
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
347 9d3f16da Giannakos Filippos
        map->flags &= ~MF_MAP_CLOSING;
348 9d3f16da Giannakos Filippos
        err = req->state & XS_FAILED;
349 9d3f16da Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
350 9d3f16da Giannakos Filippos
        if (err)
351 9d3f16da Giannakos Filippos
                return -1;
352 7c130c7d Filippos Giannakos
        return 0;
353 7c130c7d Filippos Giannakos
}
354 7c130c7d Filippos Giannakos
355 9d3f16da Giannakos Filippos
/*
356 40e56a42 Filippos Giannakos
static int find_or_load_map(struct peerd *peer, struct peer_req *pr, 
357 7c130c7d Filippos Giannakos
                                char *target, uint32_t targetlen, struct map **m)
358 7c130c7d Filippos Giannakos
{
359 5ad213f0 Filippos Giannakos
        struct mapperd *mapper = __get_mapperd(peer);
360 7c130c7d Filippos Giannakos
        int r;
361 7c130c7d Filippos Giannakos
        *m = find_map(mapper, target, targetlen);
362 7c130c7d Filippos Giannakos
        if (*m) {
363 6af5c5c6 Giannakos Filippos
                XSEGLOG2(&lc, D, "Found map %s (%u)", (*m)->volume, (unsigned long) *m);
364 d85c1e11 Filippos Giannakos
                if ((*m)->flags & MF_MAP_NOT_READY) {
365 7c130c7d Filippos Giannakos
                        __xq_append_tail(&(*m)->pending, (xqindex) pr);
366 feeaaf17 Filippos Giannakos
                        XSEGLOG2(&lc, I, "Map %s found and not ready", (*m)->volume);
367 40e56a42 Filippos Giannakos
                        return MF_PENDING;
368 d85c1e11 Filippos Giannakos
                //} else if ((*m)->flags & MF_MAP_DESTROYED){
369 d85c1e11 Filippos Giannakos
                //        return -1;
370 d85c1e11 Filippos Giannakos
                // 
371 d85c1e11 Filippos Giannakos
                }else {
372 feeaaf17 Filippos Giannakos
                        XSEGLOG2(&lc, I, "Map %s found", (*m)->volume);
373 7c130c7d Filippos Giannakos
                        return 0;
374 7c130c7d Filippos Giannakos
                }
375 7c130c7d Filippos Giannakos
        }
376 9d3f16da Giannakos Filippos
        r = open_map(peer, pr, target, targetlen, 0);
377 7c130c7d Filippos Giannakos
        if (r < 0)
378 7c130c7d Filippos Giannakos
                return -1; //error
379 40e56a42 Filippos Giannakos
        return MF_PENDING;        
380 7c130c7d Filippos Giannakos
}
381 9d3f16da Giannakos Filippos
*/
382 40e56a42 Filippos Giannakos
/*
383 40e56a42 Filippos Giannakos
 * Object handling functions
384 40e56a42 Filippos Giannakos
 */
385 7c130c7d Filippos Giannakos
386 7c130c7d Filippos Giannakos
struct map_node *find_object(struct map *map, uint64_t obj_index)
387 7c130c7d Filippos Giannakos
{
388 7c130c7d Filippos Giannakos
        struct map_node *mn;
389 7c130c7d Filippos Giannakos
        int r = xhash_lookup(map->objects, obj_index, (xhashidx *) &mn);
390 5ad213f0 Filippos Giannakos
        if (r < 0)
391 7c130c7d Filippos Giannakos
                return NULL;
392 7c130c7d Filippos Giannakos
        return mn;
393 7c130c7d Filippos Giannakos
}
394 7c130c7d Filippos Giannakos
395 7c130c7d Filippos Giannakos
static int insert_object(struct map *map, struct map_node *mn)
396 7c130c7d Filippos Giannakos
{
397 7c130c7d Filippos Giannakos
        //FIXME no find object first
398 7c130c7d Filippos Giannakos
        int r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
399 7c130c7d Filippos Giannakos
        if (r == -XHASH_ERESIZE) {
400 7c130c7d Filippos Giannakos
                unsigned long shift = xhash_grow_size_shift(map->objects);
401 7c130c7d Filippos Giannakos
                map->objects = xhash_resize(map->objects, shift, NULL);
402 7c130c7d Filippos Giannakos
                if (!map->objects)
403 7c130c7d Filippos Giannakos
                        return -1;
404 7c130c7d Filippos Giannakos
                r = xhash_insert(map->objects, mn->objectidx, (xhashidx) mn);
405 7c130c7d Filippos Giannakos
        }
406 7c130c7d Filippos Giannakos
        return r;
407 7c130c7d Filippos Giannakos
}
408 7c130c7d Filippos Giannakos
409 40e56a42 Filippos Giannakos
410 40e56a42 Filippos Giannakos
/*
411 40e56a42 Filippos Giannakos
 * map read/write functions 
412 40e56a42 Filippos Giannakos
 */
413 2ddecabe Giannakos Filippos
static inline void pithosmap_to_object(struct map_node *mn, unsigned char *buf)
414 40e56a42 Filippos Giannakos
{
415 40e56a42 Filippos Giannakos
        int i;
416 40e56a42 Filippos Giannakos
        //hexlify sha256 value
417 40e56a42 Filippos Giannakos
        for (i = 0; i < SHA256_DIGEST_SIZE; i++) {
418 2ddecabe Giannakos Filippos
                sprintf(mn->object+2*i, "%02x", buf[i]);
419 40e56a42 Filippos Giannakos
        }
420 40e56a42 Filippos Giannakos
421 65062a91 Filippos Giannakos
        mn->object[SHA256_DIGEST_SIZE * 2] = 0;
422 65062a91 Filippos Giannakos
        mn->objectlen = SHA256_DIGEST_SIZE * 2;
423 2ddecabe Giannakos Filippos
        mn->flags = MF_OBJECT_EXIST;
424 40e56a42 Filippos Giannakos
}
425 40e56a42 Filippos Giannakos
426 40e56a42 Filippos Giannakos
static inline void map_to_object(struct map_node *mn, char *buf)
427 40e56a42 Filippos Giannakos
{
428 40e56a42 Filippos Giannakos
        char c = buf[0];
429 40e56a42 Filippos Giannakos
        mn->flags = 0;
430 40e56a42 Filippos Giannakos
        if (c)
431 40e56a42 Filippos Giannakos
                mn->flags |= MF_OBJECT_EXIST;
432 65062a91 Filippos Giannakos
        memcpy(mn->object, buf+1, XSEG_MAX_TARGETLEN);
433 65062a91 Filippos Giannakos
        mn->object[XSEG_MAX_TARGETLEN] = 0;
434 40e56a42 Filippos Giannakos
        mn->objectlen = strlen(mn->object);
435 40e56a42 Filippos Giannakos
}
436 40e56a42 Filippos Giannakos
437 7c130c7d Filippos Giannakos
static inline void object_to_map(char* buf, struct map_node *mn)
438 7c130c7d Filippos Giannakos
{
439 7c130c7d Filippos Giannakos
        buf[0] = (mn->flags & MF_OBJECT_EXIST)? 1 : 0;
440 7c130c7d Filippos Giannakos
        memcpy(buf+1, mn->object, mn->objectlen);
441 65062a91 Filippos Giannakos
        memset(buf+1+mn->objectlen, 0, XSEG_MAX_TARGETLEN - mn->objectlen); //zero out the rest of the buffer
442 7c130c7d Filippos Giannakos
}
443 7c130c7d Filippos Giannakos
444 40e56a42 Filippos Giannakos
static inline void mapheader_to_map(struct map *m, char *buf)
445 40e56a42 Filippos Giannakos
{
446 40e56a42 Filippos Giannakos
        uint64_t pos = 0;
447 40e56a42 Filippos Giannakos
        memcpy(buf + pos, magic_sha256, SHA256_DIGEST_SIZE);
448 40e56a42 Filippos Giannakos
        pos += SHA256_DIGEST_SIZE;
449 40e56a42 Filippos Giannakos
        memcpy(buf + pos, &m->size, sizeof(m->size));
450 40e56a42 Filippos Giannakos
        pos += sizeof(m->size);
451 40e56a42 Filippos Giannakos
}
452 40e56a42 Filippos Giannakos
453 40e56a42 Filippos Giannakos
454 9d3f16da Giannakos Filippos
static struct xseg_request * object_write(struct peerd *peer, struct peer_req *pr, 
455 d85c1e11 Filippos Giannakos
                                struct map *map, struct map_node *mn)
456 7c130c7d Filippos Giannakos
{
457 7c130c7d Filippos Giannakos
        void *dummy;
458 7c130c7d Filippos Giannakos
        struct mapperd *mapper = __get_mapperd(peer);
459 cc21de66 Giannakos Filippos
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno,
460 e155e2f9 Giannakos Filippos
                                                        mapper->mbportno, X_ALLOC);
461 feeaaf17 Filippos Giannakos
        if (!req){
462 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
463 feeaaf17 Filippos Giannakos
                                "(Map: %s [%llu]",
464 feeaaf17 Filippos Giannakos
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
465 7c130c7d Filippos Giannakos
                goto out_err;
466 feeaaf17 Filippos Giannakos
        }
467 6d417535 Giannakos Filippos
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, objectsize_in_map);
468 feeaaf17 Filippos Giannakos
        if (r < 0){
469 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, E, "Cannot allocate request for object %s. \n\t"
470 feeaaf17 Filippos Giannakos
                                "(Map: %s [%llu]",
471 feeaaf17 Filippos Giannakos
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
472 7c130c7d Filippos Giannakos
                goto out_put;
473 feeaaf17 Filippos Giannakos
        }
474 7c130c7d Filippos Giannakos
        char *target = xseg_get_target(peer->xseg, req);
475 3602502b Giannakos Filippos
        strncpy(target, map->volume, req->targetlen);
476 7c130c7d Filippos Giannakos
        req->size = objectsize_in_map;
477 40e56a42 Filippos Giannakos
        req->offset = mapheader_size + mn->objectidx * objectsize_in_map;
478 7c130c7d Filippos Giannakos
        req->op = X_WRITE;
479 7c130c7d Filippos Giannakos
        char *data = xseg_get_data(peer->xseg, req);
480 7c130c7d Filippos Giannakos
        object_to_map(data, mn);
481 7c130c7d Filippos Giannakos
482 7c130c7d Filippos Giannakos
        r = xseg_set_req_data(peer->xseg, req, pr);
483 feeaaf17 Filippos Giannakos
        if (r < 0){
484 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, E, "Cannot set request data for object %s. \n\t"
485 feeaaf17 Filippos Giannakos
                                "(Map: %s [%llu]",
486 feeaaf17 Filippos Giannakos
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
487 7c130c7d Filippos Giannakos
                goto out_put;
488 feeaaf17 Filippos Giannakos
        }
489 cc21de66 Giannakos Filippos
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
490 feeaaf17 Filippos Giannakos
        if (p == NoPort){
491 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, E, "Cannot submit request for object %s. \n\t"
492 feeaaf17 Filippos Giannakos
                                "(Map: %s [%llu]",
493 feeaaf17 Filippos Giannakos
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
494 7c130c7d Filippos Giannakos
                goto out_unset;
495 feeaaf17 Filippos Giannakos
        }
496 7c130c7d Filippos Giannakos
        r = xseg_signal(peer->xseg, p);
497 feeaaf17 Filippos Giannakos
        if (r < 0)
498 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, W, "Cannot signal port %u", p);
499 feeaaf17 Filippos Giannakos
500 feeaaf17 Filippos Giannakos
        XSEGLOG2(&lc, I, "Writing object %s \n\t"
501 feeaaf17 Filippos Giannakos
                        "Map: %s [%llu]",
502 feeaaf17 Filippos Giannakos
                        mn->object, map->volume, (unsigned long long) mn->objectidx);
503 7c130c7d Filippos Giannakos
504 9d3f16da Giannakos Filippos
        return req;
505 7c130c7d Filippos Giannakos
506 7c130c7d Filippos Giannakos
out_unset:
507 7c130c7d Filippos Giannakos
        xseg_get_req_data(peer->xseg, req, &dummy);
508 7c130c7d Filippos Giannakos
out_put:
509 cc21de66 Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
510 7c130c7d Filippos Giannakos
out_err:
511 feeaaf17 Filippos Giannakos
        XSEGLOG2(&lc, E, "Object write for object %s failed. \n\t"
512 feeaaf17 Filippos Giannakos
                        "(Map: %s [%llu]",
513 feeaaf17 Filippos Giannakos
                        mn->object, map->volume, (unsigned long long) mn->objectidx);
514 9d3f16da Giannakos Filippos
        return NULL;
515 7c130c7d Filippos Giannakos
}
516 7c130c7d Filippos Giannakos
517 9d3f16da Giannakos Filippos
static struct xseg_request * __write_map(struct peer_req* pr, struct map *map)
518 7c130c7d Filippos Giannakos
{
519 7c130c7d Filippos Giannakos
        void *dummy;
520 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
521 7c130c7d Filippos Giannakos
        struct mapperd *mapper = __get_mapperd(peer);
522 7c130c7d Filippos Giannakos
        struct map_node *mn;
523 40e56a42 Filippos Giannakos
        uint64_t i, pos, max_objidx = calc_map_obj(map);
524 cc21de66 Giannakos Filippos
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
525 e155e2f9 Giannakos Filippos
                                                        mapper->mbportno, X_ALLOC);
526 feeaaf17 Filippos Giannakos
        if (!req){
527 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s", map->volume);
528 7c130c7d Filippos Giannakos
                goto out_err;
529 feeaaf17 Filippos Giannakos
        }
530 7c130c7d Filippos Giannakos
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, 
531 7c130c7d Filippos Giannakos
                                        mapheader_size + max_objidx * objectsize_in_map);
532 feeaaf17 Filippos Giannakos
        if (r < 0){
533 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s", map->volume);
534 7c130c7d Filippos Giannakos
                goto out_put;
535 feeaaf17 Filippos Giannakos
        }
536 d85c1e11 Filippos Giannakos
        char *target = xseg_get_target(peer->xseg, req);
537 d85c1e11 Filippos Giannakos
        strncpy(target, map->volume, req->targetlen);
538 7c130c7d Filippos Giannakos
        char *data = xseg_get_data(peer->xseg, req);
539 7c130c7d Filippos Giannakos
        mapheader_to_map(map, data);
540 7c130c7d Filippos Giannakos
        pos = mapheader_size;
541 d85c1e11 Filippos Giannakos
        req->op = X_WRITE;
542 d85c1e11 Filippos Giannakos
        req->size = req->datalen;
543 d85c1e11 Filippos Giannakos
        req->offset = 0;
544 7c130c7d Filippos Giannakos
545 7c130c7d Filippos Giannakos
        if (map->size % block_size)
546 7c130c7d Filippos Giannakos
                max_objidx++;
547 7c130c7d Filippos Giannakos
        for (i = 0; i < max_objidx; i++) {
548 7c130c7d Filippos Giannakos
                mn = find_object(map, i);
549 d85c1e11 Filippos Giannakos
                if (!mn){
550 feeaaf17 Filippos Giannakos
                        XSEGLOG2(&lc, E, "Cannot find object %lli for map %s",
551 feeaaf17 Filippos Giannakos
                                        (unsigned long long) i, map->volume);
552 7c130c7d Filippos Giannakos
                        goto out_put;
553 d85c1e11 Filippos Giannakos
                }
554 7c130c7d Filippos Giannakos
                object_to_map(data+pos, mn);
555 7c130c7d Filippos Giannakos
                pos += objectsize_in_map;
556 7c130c7d Filippos Giannakos
        }
557 7c130c7d Filippos Giannakos
        r = xseg_set_req_data(peer->xseg, req, pr);
558 feeaaf17 Filippos Giannakos
        if (r < 0){
559 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
560 feeaaf17 Filippos Giannakos
                                map->volume);
561 7c130c7d Filippos Giannakos
                goto out_put;
562 feeaaf17 Filippos Giannakos
        }
563 cc21de66 Giannakos Filippos
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
564 feeaaf17 Filippos Giannakos
        if (p == NoPort){
565 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
566 feeaaf17 Filippos Giannakos
                                map->volume);
567 7c130c7d Filippos Giannakos
                goto out_unset;
568 feeaaf17 Filippos Giannakos
        }
569 7c130c7d Filippos Giannakos
        r = xseg_signal(peer->xseg, p);
570 feeaaf17 Filippos Giannakos
        if (r < 0)
571 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, W, "Cannot signal port %u", p);
572 feeaaf17 Filippos Giannakos
573 d85c1e11 Filippos Giannakos
        map->flags |= MF_MAP_WRITING;
574 feeaaf17 Filippos Giannakos
        XSEGLOG2(&lc, I, "Writing map %s", map->volume);
575 9d3f16da Giannakos Filippos
        return req;
576 7c130c7d Filippos Giannakos
577 7c130c7d Filippos Giannakos
out_unset:
578 7c130c7d Filippos Giannakos
        xseg_get_req_data(peer->xseg, req, &dummy);
579 7c130c7d Filippos Giannakos
out_put:
580 cc21de66 Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
581 7c130c7d Filippos Giannakos
out_err:
582 feeaaf17 Filippos Giannakos
        XSEGLOG2(&lc, E, "Map write for map %s failed.", map->volume);
583 9d3f16da Giannakos Filippos
        return NULL;
584 9d3f16da Giannakos Filippos
}
585 9d3f16da Giannakos Filippos
586 9d3f16da Giannakos Filippos
static int write_map(struct peer_req* pr, struct map *map)
587 9d3f16da Giannakos Filippos
{
588 9d3f16da Giannakos Filippos
        int r = 0;
589 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
590 9d3f16da Giannakos Filippos
        map->flags |= MF_MAP_WRITING;
591 9d3f16da Giannakos Filippos
        struct xseg_request *req = __write_map(pr, map);
592 9d3f16da Giannakos Filippos
        if (!req)
593 9d3f16da Giannakos Filippos
                return -1;
594 9d3f16da Giannakos Filippos
        wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
595 9d3f16da Giannakos Filippos
        if (req->state & XS_FAILED)
596 9d3f16da Giannakos Filippos
                r = -1;
597 9d3f16da Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
598 9d3f16da Giannakos Filippos
        map->flags &= ~MF_MAP_WRITING;
599 9d3f16da Giannakos Filippos
        return r;
600 9d3f16da Giannakos Filippos
}
601 9d3f16da Giannakos Filippos
602 9d3f16da Giannakos Filippos
static struct xseg_request * __load_map(struct peer_req *pr, struct map *m)
603 9d3f16da Giannakos Filippos
{
604 9d3f16da Giannakos Filippos
        int r;
605 9d3f16da Giannakos Filippos
        xport p;
606 9d3f16da Giannakos Filippos
        struct xseg_request *req;
607 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
608 9d3f16da Giannakos Filippos
        struct mapperd *mapper = __get_mapperd(peer);
609 9d3f16da Giannakos Filippos
        void *dummy;
610 9d3f16da Giannakos Filippos
611 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, I, "Loading ng map %s", m->volume);
612 9d3f16da Giannakos Filippos
613 9d3f16da Giannakos Filippos
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
614 9d3f16da Giannakos Filippos
        if (!req){
615 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
616 9d3f16da Giannakos Filippos
                                m->volume);
617 9d3f16da Giannakos Filippos
                goto out_fail;
618 9d3f16da Giannakos Filippos
        }
619 9d3f16da Giannakos Filippos
620 9d3f16da Giannakos Filippos
        r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
621 9d3f16da Giannakos Filippos
        if (r < 0){
622 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
623 9d3f16da Giannakos Filippos
                                m->volume);
624 9d3f16da Giannakos Filippos
                goto out_put;
625 9d3f16da Giannakos Filippos
        }
626 9d3f16da Giannakos Filippos
627 9d3f16da Giannakos Filippos
        char *reqtarget = xseg_get_target(peer->xseg, req);
628 9d3f16da Giannakos Filippos
        if (!reqtarget)
629 9d3f16da Giannakos Filippos
                goto out_put;
630 9d3f16da Giannakos Filippos
        strncpy(reqtarget, m->volume, req->targetlen);
631 9d3f16da Giannakos Filippos
        req->op = X_READ;
632 9d3f16da Giannakos Filippos
        req->size = block_size;
633 9d3f16da Giannakos Filippos
        req->offset = 0;
634 9d3f16da Giannakos Filippos
        r = xseg_set_req_data(peer->xseg, req, pr);
635 9d3f16da Giannakos Filippos
        if (r < 0){
636 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
637 9d3f16da Giannakos Filippos
                                m->volume);
638 9d3f16da Giannakos Filippos
                goto out_put;
639 9d3f16da Giannakos Filippos
        }
640 9d3f16da Giannakos Filippos
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
641 9d3f16da Giannakos Filippos
        if (p == NoPort){ 
642 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
643 9d3f16da Giannakos Filippos
                                m->volume);
644 9d3f16da Giannakos Filippos
                goto out_unset;
645 9d3f16da Giannakos Filippos
        }
646 9d3f16da Giannakos Filippos
        r = xseg_signal(peer->xseg, p);
647 9d3f16da Giannakos Filippos
        
648 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, I, "Map %s loading", m->volume);
649 9d3f16da Giannakos Filippos
        return req;
650 9d3f16da Giannakos Filippos
651 9d3f16da Giannakos Filippos
out_unset:
652 9d3f16da Giannakos Filippos
        xseg_get_req_data(peer->xseg, req, &dummy);
653 9d3f16da Giannakos Filippos
out_put:
654 9d3f16da Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
655 9d3f16da Giannakos Filippos
out_fail:
656 9d3f16da Giannakos Filippos
        return NULL;
657 7c130c7d Filippos Giannakos
}
658 7c130c7d Filippos Giannakos
659 9d3f16da Giannakos Filippos
static int read_map (struct map *map, char *buf)
660 40e56a42 Filippos Giannakos
{
661 40e56a42 Filippos Giannakos
        char nulls[SHA256_DIGEST_SIZE];
662 40e56a42 Filippos Giannakos
        memset(nulls, 0, SHA256_DIGEST_SIZE);
663 40e56a42 Filippos Giannakos
664 40e56a42 Filippos Giannakos
        int r = !memcmp(buf, nulls, SHA256_DIGEST_SIZE);
665 40e56a42 Filippos Giannakos
        if (r) {
666 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, D, "Read zeros");
667 40e56a42 Filippos Giannakos
                //read error;
668 40e56a42 Filippos Giannakos
                return -1;
669 40e56a42 Filippos Giannakos
        }
670 40e56a42 Filippos Giannakos
        //type 1, our type, type 0 pithos map
671 40e56a42 Filippos Giannakos
        int type = !memcmp(buf, magic_sha256, SHA256_DIGEST_SIZE);
672 feeaaf17 Filippos Giannakos
        XSEGLOG2(&lc, I, "Type %d detected for map %s", type, map->volume);
673 40e56a42 Filippos Giannakos
        uint64_t pos;
674 40e56a42 Filippos Giannakos
        uint64_t i, nr_objs;
675 40e56a42 Filippos Giannakos
        struct map_node *map_node;
676 40e56a42 Filippos Giannakos
        if (type) {
677 40e56a42 Filippos Giannakos
                pos = SHA256_DIGEST_SIZE;
678 40e56a42 Filippos Giannakos
                map->size = *(uint64_t *) (buf + pos);
679 40e56a42 Filippos Giannakos
                pos += sizeof(uint64_t);
680 40e56a42 Filippos Giannakos
                nr_objs = map->size / block_size;
681 40e56a42 Filippos Giannakos
                if (map->size % block_size)
682 40e56a42 Filippos Giannakos
                        nr_objs++;
683 40e56a42 Filippos Giannakos
                map_node = calloc(nr_objs, sizeof(struct map_node));
684 40e56a42 Filippos Giannakos
                if (!map_node)
685 40e56a42 Filippos Giannakos
                        return -1;
686 40e56a42 Filippos Giannakos
687 40e56a42 Filippos Giannakos
                for (i = 0; i < nr_objs; i++) {
688 d85c1e11 Filippos Giannakos
                        map_node[i].map = map;
689 40e56a42 Filippos Giannakos
                        map_node[i].objectidx = i;
690 9d3f16da Giannakos Filippos
                        map_node[i].waiters = 0;
691 9d3f16da Giannakos Filippos
                        map_node[i].ref = 1;
692 9d3f16da Giannakos Filippos
                        map_node[i].cond = st_cond_new(); //FIXME err check;
693 40e56a42 Filippos Giannakos
                        map_to_object(&map_node[i], buf + pos);
694 40e56a42 Filippos Giannakos
                        pos += objectsize_in_map;
695 40e56a42 Filippos Giannakos
                        r = insert_object(map, &map_node[i]); //FIXME error check
696 40e56a42 Filippos Giannakos
                }
697 40e56a42 Filippos Giannakos
        } else {
698 40e56a42 Filippos Giannakos
                pos = 0;
699 40e56a42 Filippos Giannakos
                uint64_t max_nr_objs = block_size/SHA256_DIGEST_SIZE;
700 40e56a42 Filippos Giannakos
                map_node = calloc(max_nr_objs, sizeof(struct map_node));
701 40e56a42 Filippos Giannakos
                if (!map_node)
702 40e56a42 Filippos Giannakos
                        return -1;
703 40e56a42 Filippos Giannakos
                for (i = 0; i < max_nr_objs; i++) {
704 40e56a42 Filippos Giannakos
                        if (!memcmp(buf+pos, nulls, SHA256_DIGEST_SIZE))
705 40e56a42 Filippos Giannakos
                                break;
706 40e56a42 Filippos Giannakos
                        map_node[i].objectidx = i;
707 d85c1e11 Filippos Giannakos
                        map_node[i].map = map;
708 9d3f16da Giannakos Filippos
                        map_node[i].waiters = 0;
709 9d3f16da Giannakos Filippos
                        map_node[i].ref = 1;
710 9d3f16da Giannakos Filippos
                        map_node[i].cond = st_cond_new(); //FIXME err check;
711 40e56a42 Filippos Giannakos
                        pithosmap_to_object(&map_node[i], buf + pos);
712 40e56a42 Filippos Giannakos
                        pos += SHA256_DIGEST_SIZE; 
713 40e56a42 Filippos Giannakos
                        r = insert_object(map, &map_node[i]); //FIXME error check
714 40e56a42 Filippos Giannakos
                }
715 40e56a42 Filippos Giannakos
                map->size = i * block_size; 
716 40e56a42 Filippos Giannakos
        }
717 9d3f16da Giannakos Filippos
        print_map(map);
718 feeaaf17 Filippos Giannakos
        XSEGLOG2(&lc, I, "Map read for map %s completed", map->volume);
719 40e56a42 Filippos Giannakos
        return 0;
720 40e56a42 Filippos Giannakos
721 40e56a42 Filippos Giannakos
        //FIXME cleanup on error
722 40e56a42 Filippos Giannakos
}
723 40e56a42 Filippos Giannakos
724 9d3f16da Giannakos Filippos
static int load_map(struct peer_req *pr, struct map *map)
725 9d3f16da Giannakos Filippos
{
726 9d3f16da Giannakos Filippos
        int r = 0;
727 9d3f16da Giannakos Filippos
        struct xseg_request *req;
728 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
729 9d3f16da Giannakos Filippos
        map->flags |= MF_MAP_LOADING;
730 9d3f16da Giannakos Filippos
        req = __load_map(pr, map);
731 9d3f16da Giannakos Filippos
        if (!req)
732 9d3f16da Giannakos Filippos
                return -1;
733 9d3f16da Giannakos Filippos
        wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
734 9d3f16da Giannakos Filippos
        map->flags &= ~MF_MAP_LOADING;
735 9d3f16da Giannakos Filippos
        if (req->state & XS_FAILED){
736 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Map load failed for map %s", map->volume);
737 9d3f16da Giannakos Filippos
                xseg_put_request(peer->xseg, req, pr->portno);
738 9d3f16da Giannakos Filippos
                return -1;
739 9d3f16da Giannakos Filippos
        }
740 9d3f16da Giannakos Filippos
        r = read_map(map, xseg_get_data(peer->xseg, req));
741 9d3f16da Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
742 9d3f16da Giannakos Filippos
        return r;
743 9d3f16da Giannakos Filippos
}
744 9d3f16da Giannakos Filippos
745 9d3f16da Giannakos Filippos
static struct xseg_request * __open_map(struct peer_req *pr, struct map *m)
746 9d3f16da Giannakos Filippos
{
747 9d3f16da Giannakos Filippos
        int r;
748 9d3f16da Giannakos Filippos
        xport p;
749 9d3f16da Giannakos Filippos
        struct xseg_request *req;
750 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
751 9d3f16da Giannakos Filippos
        struct mapperd *mapper = __get_mapperd(peer);
752 9d3f16da Giannakos Filippos
        void *dummy;
753 9d3f16da Giannakos Filippos
754 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, I, "Opening map %s", m->volume);
755 9d3f16da Giannakos Filippos
756 9d3f16da Giannakos Filippos
        req = xseg_get_request(peer->xseg, pr->portno, mapper->mbportno, X_ALLOC);
757 9d3f16da Giannakos Filippos
        if (!req){
758 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot allocate request for map %s",
759 9d3f16da Giannakos Filippos
                                m->volume);
760 9d3f16da Giannakos Filippos
                goto out_fail;
761 9d3f16da Giannakos Filippos
        }
762 9d3f16da Giannakos Filippos
763 9d3f16da Giannakos Filippos
        r = xseg_prep_request(peer->xseg, req, m->volumelen, block_size);
764 9d3f16da Giannakos Filippos
        if (r < 0){
765 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot prepare request for map %s",
766 9d3f16da Giannakos Filippos
                                m->volume);
767 9d3f16da Giannakos Filippos
                goto out_put;
768 9d3f16da Giannakos Filippos
        }
769 9d3f16da Giannakos Filippos
770 9d3f16da Giannakos Filippos
        char *reqtarget = xseg_get_target(peer->xseg, req);
771 9d3f16da Giannakos Filippos
        if (!reqtarget)
772 9d3f16da Giannakos Filippos
                goto out_put;
773 9d3f16da Giannakos Filippos
        strncpy(reqtarget, m->volume, req->targetlen);
774 9d3f16da Giannakos Filippos
        req->op = X_OPEN;
775 9d3f16da Giannakos Filippos
        req->size = block_size;
776 9d3f16da Giannakos Filippos
        req->offset = 0;
777 9d3f16da Giannakos Filippos
        r = xseg_set_req_data(peer->xseg, req, pr);
778 9d3f16da Giannakos Filippos
        if (r < 0){
779 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot set request data for map %s",
780 9d3f16da Giannakos Filippos
                                m->volume);
781 9d3f16da Giannakos Filippos
                goto out_put;
782 9d3f16da Giannakos Filippos
        }
783 9d3f16da Giannakos Filippos
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
784 9d3f16da Giannakos Filippos
        if (p == NoPort){ 
785 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot submit request for map %s",
786 9d3f16da Giannakos Filippos
                                m->volume);
787 9d3f16da Giannakos Filippos
                goto out_unset;
788 9d3f16da Giannakos Filippos
        }
789 9d3f16da Giannakos Filippos
        r = xseg_signal(peer->xseg, p);
790 9d3f16da Giannakos Filippos
        
791 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, I, "Map %s opening", m->volume);
792 9d3f16da Giannakos Filippos
        return req;
793 9d3f16da Giannakos Filippos
794 9d3f16da Giannakos Filippos
out_unset:
795 9d3f16da Giannakos Filippos
        xseg_get_req_data(peer->xseg, req, &dummy);
796 9d3f16da Giannakos Filippos
out_put:
797 9d3f16da Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
798 9d3f16da Giannakos Filippos
out_fail:
799 9d3f16da Giannakos Filippos
        return NULL;
800 9d3f16da Giannakos Filippos
}
801 9d3f16da Giannakos Filippos
802 9d3f16da Giannakos Filippos
static int open_map(struct peer_req *pr, struct map *map)
803 9d3f16da Giannakos Filippos
{
804 9d3f16da Giannakos Filippos
        int err;
805 9d3f16da Giannakos Filippos
        struct xseg_request *req;
806 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
807 9d3f16da Giannakos Filippos
808 9d3f16da Giannakos Filippos
        map->flags |= MF_MAP_OPENING;
809 9d3f16da Giannakos Filippos
        req = __open_map(pr, map);
810 9d3f16da Giannakos Filippos
        if (!req)
811 9d3f16da Giannakos Filippos
                return -1;
812 9d3f16da Giannakos Filippos
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
813 9d3f16da Giannakos Filippos
        map->flags &= ~MF_MAP_OPENING;
814 9d3f16da Giannakos Filippos
        err = req->state & XS_FAILED;
815 9d3f16da Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
816 9d3f16da Giannakos Filippos
        if (err)
817 9d3f16da Giannakos Filippos
                return -1;
818 9d3f16da Giannakos Filippos
        else 
819 9d3f16da Giannakos Filippos
                map->flags |= MF_MAP_EXCLUSIVE;
820 9d3f16da Giannakos Filippos
        return 0;
821 9d3f16da Giannakos Filippos
}
822 9d3f16da Giannakos Filippos
823 40e56a42 Filippos Giannakos
/*
824 40e56a42 Filippos Giannakos
 * copy up functions
825 40e56a42 Filippos Giannakos
 */
826 40e56a42 Filippos Giannakos
827 7c130c7d Filippos Giannakos
static int __set_copyup_node(struct mapper_io *mio, struct xseg_request *req, struct map_node *mn)
828 7c130c7d Filippos Giannakos
{
829 7c130c7d Filippos Giannakos
        int r = 0;
830 7c130c7d Filippos Giannakos
        if (mn){
831 7c130c7d Filippos Giannakos
                r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
832 7c130c7d Filippos Giannakos
                if (r == -XHASH_ERESIZE) {
833 7c130c7d Filippos Giannakos
                        xhashidx shift = xhash_grow_size_shift(mio->copyups_nodes);
834 7c130c7d Filippos Giannakos
                        xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
835 7c130c7d Filippos Giannakos
                        if (!new_hashmap)
836 7c130c7d Filippos Giannakos
                                goto out;
837 7c130c7d Filippos Giannakos
                        mio->copyups_nodes = new_hashmap;
838 7c130c7d Filippos Giannakos
                        r = xhash_insert(mio->copyups_nodes, (xhashidx) req, (xhashidx) mn);
839 7c130c7d Filippos Giannakos
                }
840 7c130c7d Filippos Giannakos
        }
841 7c130c7d Filippos Giannakos
        else {
842 7c130c7d Filippos Giannakos
                r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
843 7c130c7d Filippos Giannakos
                if (r == -XHASH_ERESIZE) {
844 7c130c7d Filippos Giannakos
                        xhashidx shift = xhash_shrink_size_shift(mio->copyups_nodes);
845 7c130c7d Filippos Giannakos
                        xhash_t *new_hashmap = xhash_resize(mio->copyups_nodes, shift, NULL);
846 7c130c7d Filippos Giannakos
                        if (!new_hashmap)
847 7c130c7d Filippos Giannakos
                                goto out;
848 7c130c7d Filippos Giannakos
                        mio->copyups_nodes = new_hashmap;
849 7c130c7d Filippos Giannakos
                        r = xhash_delete(mio->copyups_nodes, (xhashidx) req);
850 7c130c7d Filippos Giannakos
                }
851 7c130c7d Filippos Giannakos
        }
852 7c130c7d Filippos Giannakos
out:
853 7c130c7d Filippos Giannakos
        return r;
854 7c130c7d Filippos Giannakos
}
855 7c130c7d Filippos Giannakos
856 7c130c7d Filippos Giannakos
static struct map_node * __get_copyup_node(struct mapper_io *mio, struct xseg_request *req)
857 7c130c7d Filippos Giannakos
{
858 7c130c7d Filippos Giannakos
        struct map_node *mn;
859 7c130c7d Filippos Giannakos
        int r = xhash_lookup(mio->copyups_nodes, (xhashidx) req, (xhashidx *) &mn);
860 7c130c7d Filippos Giannakos
        if (r < 0)
861 7c130c7d Filippos Giannakos
                return NULL;
862 7c130c7d Filippos Giannakos
        return mn;
863 7c130c7d Filippos Giannakos
}
864 7c130c7d Filippos Giannakos
865 9d3f16da Giannakos Filippos
static struct xseg_request * copyup_object(struct peerd *peer, struct map_node *mn, struct peer_req *pr)
866 7c130c7d Filippos Giannakos
{
867 7c130c7d Filippos Giannakos
        struct mapperd *mapper = __get_mapperd(peer);
868 7c130c7d Filippos Giannakos
        struct mapper_io *mio = __get_mapper_io(pr);
869 42a0461c Giannakos Filippos
        struct map *map = mn->map;
870 7c130c7d Filippos Giannakos
        void *dummy;
871 7c130c7d Filippos Giannakos
        int r = -1, i;
872 7c130c7d Filippos Giannakos
        xport p;
873 6805c197 Giannakos Filippos
874 6805c197 Giannakos Filippos
        //struct sha256_ctx sha256ctx;
875 7c130c7d Filippos Giannakos
        uint32_t newtargetlen;
876 65062a91 Filippos Giannakos
        char new_target[XSEG_MAX_TARGETLEN + 1]; 
877 5ad213f0 Filippos Giannakos
        unsigned char buf[SHA256_DIGEST_SIZE];        //assert sha256_digest_size(32) <= MAXTARGETLEN 
878 65062a91 Filippos Giannakos
        char new_object[XSEG_MAX_TARGETLEN + 20]; //20 is an arbitrary padding able to hold string representation of objectidx
879 42a0461c Giannakos Filippos
        strncpy(new_object, map->volume, map->volumelen);
880 42a0461c Giannakos Filippos
        sprintf(new_object + map->volumelen, "%u", mn->objectidx); //sprintf adds null termination
881 65062a91 Filippos Giannakos
        new_object[XSEG_MAX_TARGETLEN + 19] = 0;
882 7c130c7d Filippos Giannakos
883 6805c197 Giannakos Filippos
        gcry_md_hash_buffer(GCRY_MD_SHA256, buf, new_object, strlen(new_object));
884 7c130c7d Filippos Giannakos
        for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
885 5ad213f0 Filippos Giannakos
                sprintf (new_target + 2*i, "%02x", buf[i]);
886 40e56a42 Filippos Giannakos
        newtargetlen = SHA256_DIGEST_SIZE  * 2;
887 7c130c7d Filippos Giannakos
888 c3382e3b Giannakos Filippos
        if (!strncmp(mn->object, zero_block, (mn->objectlen < HEXLIFIED_SHA256_DIGEST_SIZE)? mn->objectlen : HEXLIFIED_SHA256_DIGEST_SIZE)) 
889 c3382e3b Giannakos Filippos
                goto copyup_zeroblock;
890 7c130c7d Filippos Giannakos
891 cc21de66 Giannakos Filippos
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
892 7c130c7d Filippos Giannakos
                                                        mapper->bportno, X_ALLOC);
893 99dad167 Giannakos Filippos
        if (!req){
894 99dad167 Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
895 feeaaf17 Filippos Giannakos
                goto out_err;
896 99dad167 Giannakos Filippos
        }
897 7c130c7d Filippos Giannakos
        r = xseg_prep_request(peer->xseg, req, newtargetlen, 
898 7c130c7d Filippos Giannakos
                                sizeof(struct xseg_request_copy));
899 99dad167 Giannakos Filippos
        if (r < 0){
900 99dad167 Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot prepare request for object %s", mn->object);
901 7c130c7d Filippos Giannakos
                goto out_put;
902 99dad167 Giannakos Filippos
        }
903 7c130c7d Filippos Giannakos
904 7c130c7d Filippos Giannakos
        char *target = xseg_get_target(peer->xseg, req);
905 3602502b Giannakos Filippos
        strncpy(target, new_target, req->targetlen);
906 7c130c7d Filippos Giannakos
907 7c130c7d Filippos Giannakos
        struct xseg_request_copy *xcopy = (struct xseg_request_copy *) xseg_get_data(peer->xseg, req);
908 40e56a42 Filippos Giannakos
        strncpy(xcopy->target, mn->object, mn->objectlen);
909 65062a91 Filippos Giannakos
        xcopy->targetlen = mn->objectlen;
910 7c130c7d Filippos Giannakos
911 7c130c7d Filippos Giannakos
        req->offset = 0;
912 7c130c7d Filippos Giannakos
        req->size = block_size;
913 7c130c7d Filippos Giannakos
        req->op = X_COPY;
914 7c130c7d Filippos Giannakos
        r = xseg_set_req_data(peer->xseg, req, pr);
915 99dad167 Giannakos Filippos
        if (r<0){
916 99dad167 Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot set request data for object %s", mn->object);
917 7c130c7d Filippos Giannakos
                goto out_put;
918 99dad167 Giannakos Filippos
        }
919 7c130c7d Filippos Giannakos
        r = __set_copyup_node(mio, req, mn);
920 cc21de66 Giannakos Filippos
        p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
921 7c130c7d Filippos Giannakos
        if (p == NoPort) {
922 99dad167 Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot submit for object %s", mn->object);
923 7c130c7d Filippos Giannakos
                goto out_unset;
924 7c130c7d Filippos Giannakos
        }
925 7c130c7d Filippos Giannakos
        xseg_signal(peer->xseg, p);
926 9d3f16da Giannakos Filippos
//        mio->copyups++;
927 7c130c7d Filippos Giannakos
928 c3382e3b Giannakos Filippos
        mn->flags |= MF_OBJECT_COPYING;
929 feeaaf17 Filippos Giannakos
        XSEGLOG2(&lc, I, "Copying up object %s \n\t to %s", mn->object, new_target);
930 9d3f16da Giannakos Filippos
        return req;
931 7c130c7d Filippos Giannakos
932 7c130c7d Filippos Giannakos
out_unset:
933 c3382e3b Giannakos Filippos
        r = __set_copyup_node(mio, req, NULL);
934 7c130c7d Filippos Giannakos
        xseg_get_req_data(peer->xseg, req, &dummy);
935 7c130c7d Filippos Giannakos
out_put:
936 cc21de66 Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
937 feeaaf17 Filippos Giannakos
out_err:
938 feeaaf17 Filippos Giannakos
        XSEGLOG2(&lc, E, "Copying up object %s \n\t to %s failed", mn->object, new_target);
939 9d3f16da Giannakos Filippos
        return NULL;
940 7c130c7d Filippos Giannakos
941 c3382e3b Giannakos Filippos
copyup_zeroblock:
942 c3382e3b Giannakos Filippos
        XSEGLOG2(&lc, I, "Copying up of zero block is not needed."
943 c3382e3b Giannakos Filippos
                        "Proceeding in writing the new object in map");
944 c3382e3b Giannakos Filippos
        /* construct a tmp map_node for writing purposes */
945 c3382e3b Giannakos Filippos
        struct map_node newmn = *mn;
946 c3382e3b Giannakos Filippos
        newmn.flags = MF_OBJECT_EXIST;
947 c3382e3b Giannakos Filippos
        strncpy(newmn.object, new_target, newtargetlen);
948 c3382e3b Giannakos Filippos
        newmn.object[newtargetlen] = 0;
949 c3382e3b Giannakos Filippos
        newmn.objectlen = newtargetlen;
950 c3382e3b Giannakos Filippos
        newmn.objectidx = mn->objectidx; 
951 9d3f16da Giannakos Filippos
        req = object_write(peer, pr, map, &newmn);
952 c3382e3b Giannakos Filippos
        r = __set_copyup_node(mio, req, mn);
953 9d3f16da Giannakos Filippos
        if (!req){
954 c3382e3b Giannakos Filippos
                XSEGLOG2(&lc, E, "Object write returned error for object %s"
955 c3382e3b Giannakos Filippos
                                "\n\t of map %s [%llu]",
956 c3382e3b Giannakos Filippos
                                mn->object, map->volume, (unsigned long long) mn->objectidx);
957 9d3f16da Giannakos Filippos
                return NULL;
958 c3382e3b Giannakos Filippos
        }
959 c3382e3b Giannakos Filippos
        mn->flags |= MF_OBJECT_WRITING;
960 c3382e3b Giannakos Filippos
        XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
961 9d3f16da Giannakos Filippos
        return req;
962 7c130c7d Filippos Giannakos
}
963 7c130c7d Filippos Giannakos
964 9d3f16da Giannakos Filippos
static struct xseg_request * delete_object(struct peer_req *pr, struct map_node *mn)
965 7c130c7d Filippos Giannakos
{
966 9d3f16da Giannakos Filippos
        void *dummy;
967 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
968 7c130c7d Filippos Giannakos
        struct mapperd *mapper = __get_mapperd(peer);
969 9d3f16da Giannakos Filippos
        struct mapper_io *mio = __get_mapper_io(pr);
970 9d3f16da Giannakos Filippos
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
971 9d3f16da Giannakos Filippos
                                                        mapper->bportno, X_ALLOC);
972 99dad167 Giannakos Filippos
        XSEGLOG2(&lc, I, "Deleting mapnode %s", mn->object);
973 99dad167 Giannakos Filippos
        if (!req){
974 99dad167 Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot get request for object %s", mn->object);
975 7c130c7d Filippos Giannakos
                goto out_err;
976 99dad167 Giannakos Filippos
        }
977 9d3f16da Giannakos Filippos
        int r = xseg_prep_request(peer->xseg, req, mn->objectlen, 0);
978 99dad167 Giannakos Filippos
        if (r < 0){
979 99dad167 Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot prep request for object %s", mn->object);
980 9d3f16da Giannakos Filippos
                goto out_put;
981 99dad167 Giannakos Filippos
        }
982 9d3f16da Giannakos Filippos
        char *target = xseg_get_target(peer->xseg, req);
983 9d3f16da Giannakos Filippos
        strncpy(target, mn->object, req->targetlen);
984 9d3f16da Giannakos Filippos
        req->op = X_DELETE;
985 9d3f16da Giannakos Filippos
        req->size = req->datalen;
986 9d3f16da Giannakos Filippos
        req->offset = 0;
987 9d3f16da Giannakos Filippos
        r = xseg_set_req_data(peer->xseg, req, pr);
988 99dad167 Giannakos Filippos
        if (r < 0){
989 99dad167 Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot set req data for object %s", mn->object);
990 9d3f16da Giannakos Filippos
                goto out_put;
991 99dad167 Giannakos Filippos
        }
992 9d3f16da Giannakos Filippos
        __set_copyup_node(mio, req, mn);
993 9d3f16da Giannakos Filippos
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
994 99dad167 Giannakos Filippos
        if (p == NoPort){
995 99dad167 Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot submit request for object %s", mn->object);
996 9d3f16da Giannakos Filippos
                goto out_unset;
997 99dad167 Giannakos Filippos
        }
998 9d3f16da Giannakos Filippos
        r = xseg_signal(peer->xseg, p);
999 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, I, "Object %s deletion pending", mn->object);
1000 9d3f16da Giannakos Filippos
        return req;
1001 7c130c7d Filippos Giannakos
1002 9d3f16da Giannakos Filippos
out_unset:
1003 9d3f16da Giannakos Filippos
        xseg_get_req_data(peer->xseg, req, &dummy);
1004 9d3f16da Giannakos Filippos
out_put:
1005 cc21de66 Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
1006 d85c1e11 Filippos Giannakos
out_err:
1007 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, I, "Object %s deletion failed", mn->object);
1008 9d3f16da Giannakos Filippos
        return NULL;
1009 d85c1e11 Filippos Giannakos
}
1010 d85c1e11 Filippos Giannakos
1011 9d3f16da Giannakos Filippos
static struct xseg_request * delete_map(struct peer_req *pr, struct map *map)
1012 d85c1e11 Filippos Giannakos
{
1013 9d3f16da Giannakos Filippos
        void *dummy;
1014 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1015 d85c1e11 Filippos Giannakos
        struct mapperd *mapper = __get_mapperd(peer);
1016 9d3f16da Giannakos Filippos
        struct mapper_io *mio = __get_mapper_io(pr);
1017 9d3f16da Giannakos Filippos
        struct xseg_request *req = xseg_get_request(peer->xseg, pr->portno, 
1018 9d3f16da Giannakos Filippos
                                                        mapper->mbportno, X_ALLOC);
1019 99dad167 Giannakos Filippos
        XSEGLOG2(&lc, I, "Deleting map %s", map->volume);
1020 99dad167 Giannakos Filippos
        if (!req){
1021 99dad167 Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot get request for map %s", map->volume);
1022 d85c1e11 Filippos Giannakos
                goto out_err;
1023 99dad167 Giannakos Filippos
        }
1024 9d3f16da Giannakos Filippos
        int r = xseg_prep_request(peer->xseg, req, map->volumelen, 0);
1025 99dad167 Giannakos Filippos
        if (r < 0){
1026 99dad167 Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot prep request for map %s", map->volume);
1027 9d3f16da Giannakos Filippos
                goto out_put;
1028 99dad167 Giannakos Filippos
        }
1029 9d3f16da Giannakos Filippos
        char *target = xseg_get_target(peer->xseg, req);
1030 9d3f16da Giannakos Filippos
        strncpy(target, map->volume, req->targetlen);
1031 9d3f16da Giannakos Filippos
        req->op = X_DELETE;
1032 9d3f16da Giannakos Filippos
        req->size = req->datalen;
1033 9d3f16da Giannakos Filippos
        req->offset = 0;
1034 9d3f16da Giannakos Filippos
        r = xseg_set_req_data(peer->xseg, req, pr);
1035 99dad167 Giannakos Filippos
        if (r < 0){
1036 99dad167 Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot set req data for map %s", map->volume);
1037 9d3f16da Giannakos Filippos
                goto out_put;
1038 99dad167 Giannakos Filippos
        }
1039 9d3f16da Giannakos Filippos
        __set_copyup_node(mio, req, NULL);
1040 9d3f16da Giannakos Filippos
        xport p = xseg_submit(peer->xseg, req, pr->portno, X_ALLOC);
1041 99dad167 Giannakos Filippos
        if (p == NoPort){
1042 99dad167 Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot submit request for map %s", map->volume);
1043 9d3f16da Giannakos Filippos
                goto out_unset;
1044 99dad167 Giannakos Filippos
        }
1045 9d3f16da Giannakos Filippos
        r = xseg_signal(peer->xseg, p);
1046 9d3f16da Giannakos Filippos
        map->flags |= MF_MAP_DELETING;
1047 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, I, "Map %s deletion pending", map->volume);
1048 9d3f16da Giannakos Filippos
        return req;
1049 d85c1e11 Filippos Giannakos
1050 9d3f16da Giannakos Filippos
out_unset:
1051 9d3f16da Giannakos Filippos
        xseg_get_req_data(peer->xseg, req, &dummy);
1052 9d3f16da Giannakos Filippos
out_put:
1053 cc21de66 Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
1054 7c130c7d Filippos Giannakos
out_err:
1055 99dad167 Giannakos Filippos
        XSEGLOG2(&lc, E, "Map %s deletion failed", map->volume);
1056 9d3f16da Giannakos Filippos
        return  NULL;
1057 7c130c7d Filippos Giannakos
}
1058 7c130c7d Filippos Giannakos
1059 9d3f16da Giannakos Filippos
1060 9d3f16da Giannakos Filippos
static inline struct map_node * get_mapnode(struct map *map, uint32_t index)
1061 7c130c7d Filippos Giannakos
{
1062 9d3f16da Giannakos Filippos
        struct map_node *mn = find_object(map, index);
1063 9d3f16da Giannakos Filippos
        if (mn)
1064 9d3f16da Giannakos Filippos
                mn->ref++;
1065 9d3f16da Giannakos Filippos
        return mn;
1066 9d3f16da Giannakos Filippos
}
1067 feeaaf17 Filippos Giannakos
1068 9d3f16da Giannakos Filippos
static inline void put_mapnode(struct map_node *mn)
1069 9d3f16da Giannakos Filippos
{
1070 9d3f16da Giannakos Filippos
        mn->ref--;
1071 9d3f16da Giannakos Filippos
        if (!mn->ref){
1072 9d3f16da Giannakos Filippos
                //clean up mn
1073 9d3f16da Giannakos Filippos
                st_cond_destroy(mn->cond);
1074 d85c1e11 Filippos Giannakos
        }
1075 9d3f16da Giannakos Filippos
}
1076 d85c1e11 Filippos Giannakos
1077 9d3f16da Giannakos Filippos
static inline void __get_map(struct map *map)
1078 9d3f16da Giannakos Filippos
{
1079 9d3f16da Giannakos Filippos
        map->ref++;
1080 9d3f16da Giannakos Filippos
}
1081 d85c1e11 Filippos Giannakos
1082 9d3f16da Giannakos Filippos
static inline void put_map(struct map *map)
1083 9d3f16da Giannakos Filippos
{
1084 9d3f16da Giannakos Filippos
        struct map_node *mn;
1085 9d3f16da Giannakos Filippos
        map->ref--;
1086 9d3f16da Giannakos Filippos
        if (!map->ref){
1087 4fb9bce5 Giannakos Filippos
                XSEGLOG2(&lc, I, "Freeing map %s", map->volume);
1088 9d3f16da Giannakos Filippos
                //clean up map
1089 9d3f16da Giannakos Filippos
                uint64_t i;
1090 9d3f16da Giannakos Filippos
                for (i = 0; i < calc_map_obj(map); i++) {
1091 9d3f16da Giannakos Filippos
                        mn = get_mapnode(map, i);
1092 9d3f16da Giannakos Filippos
                        if (mn) {
1093 9d3f16da Giannakos Filippos
                                //make sure all pending operations on all objects are completed
1094 9d3f16da Giannakos Filippos
                                if (mn->flags & MF_OBJECT_NOT_READY){
1095 9d3f16da Giannakos Filippos
                                        //this should never happen...
1096 9d3f16da Giannakos Filippos
                                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1097 9d3f16da Giannakos Filippos
                                }
1098 9d3f16da Giannakos Filippos
                                mn->flags &= MF_OBJECT_DESTROYED;
1099 9d3f16da Giannakos Filippos
                                put_mapnode(mn); //matchin mn->ref = 1 on mn init
1100 9d3f16da Giannakos Filippos
                                put_mapnode(mn); //matcing get_mapnode;
1101 efb7bdb0 Giannakos Filippos
                                //assert mn->ref == 0;
1102 9d3f16da Giannakos Filippos
                        }
1103 9d3f16da Giannakos Filippos
                }
1104 9d3f16da Giannakos Filippos
                mn = find_object(map, 0);
1105 9d3f16da Giannakos Filippos
                if (mn)
1106 9d3f16da Giannakos Filippos
                        free(mn);
1107 4fb9bce5 Giannakos Filippos
                XSEGLOG2(&lc, I, "Freed map %s", map->volume);
1108 9d3f16da Giannakos Filippos
                free(map);
1109 d85c1e11 Filippos Giannakos
        }
1110 9d3f16da Giannakos Filippos
}
1111 d85c1e11 Filippos Giannakos
1112 9d3f16da Giannakos Filippos
static struct map * create_map(struct mapperd *mapper, char *name, uint32_t namelen)
1113 9d3f16da Giannakos Filippos
{
1114 9d3f16da Giannakos Filippos
        int r;
1115 9d3f16da Giannakos Filippos
        struct map *m = malloc(sizeof(struct map));
1116 9d3f16da Giannakos Filippos
        if (!m){
1117 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot allocate map ");
1118 7c130c7d Filippos Giannakos
                goto out_err;
1119 7c130c7d Filippos Giannakos
        }
1120 9d3f16da Giannakos Filippos
        m->size = -1;
1121 9d3f16da Giannakos Filippos
        strncpy(m->volume, name, namelen);
1122 9d3f16da Giannakos Filippos
        m->volume[namelen] = 0;
1123 9d3f16da Giannakos Filippos
        m->volumelen = namelen;
1124 9d3f16da Giannakos Filippos
        m->flags = 0;
1125 9d3f16da Giannakos Filippos
        m->objects = xhash_new(3, INTEGER); 
1126 9d3f16da Giannakos Filippos
        if (!m->objects){
1127 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot allocate object hashmap for map %s",
1128 9d3f16da Giannakos Filippos
                                m->volume);
1129 9d3f16da Giannakos Filippos
                goto out_map;
1130 feeaaf17 Filippos Giannakos
        }
1131 9d3f16da Giannakos Filippos
        m->ref = 1;
1132 9d3f16da Giannakos Filippos
        m->waiters = 0;
1133 9d3f16da Giannakos Filippos
        m->cond = st_cond_new(); //FIXME err check;
1134 9d3f16da Giannakos Filippos
        r = insert_map(mapper, m);
1135 99dad167 Giannakos Filippos
        if (r < 0){
1136 99dad167 Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot insert map %s", m->volume);
1137 9d3f16da Giannakos Filippos
                goto out_hash;
1138 99dad167 Giannakos Filippos
        }
1139 9d3f16da Giannakos Filippos
1140 9d3f16da Giannakos Filippos
        return m;
1141 9d3f16da Giannakos Filippos
1142 9d3f16da Giannakos Filippos
out_hash:
1143 9d3f16da Giannakos Filippos
        xhash_free(m->objects);
1144 9d3f16da Giannakos Filippos
out_map:
1145 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, E, "failed to create map %s", m->volume);
1146 9d3f16da Giannakos Filippos
        free(m);
1147 9d3f16da Giannakos Filippos
out_err:
1148 9d3f16da Giannakos Filippos
        return NULL;
1149 9d3f16da Giannakos Filippos
}
1150 9d3f16da Giannakos Filippos
1151 9d3f16da Giannakos Filippos
1152 9d3f16da Giannakos Filippos
1153 9d3f16da Giannakos Filippos
void deletion_cb(struct peer_req *pr, struct xseg_request *req)
1154 9d3f16da Giannakos Filippos
{
1155 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1156 9d3f16da Giannakos Filippos
        struct mapperd *mapper = __get_mapperd(peer);
1157 9d3f16da Giannakos Filippos
        (void)mapper;
1158 9d3f16da Giannakos Filippos
        struct mapper_io *mio = __get_mapper_io(pr);
1159 9d3f16da Giannakos Filippos
        struct map_node *mn = __get_copyup_node(mio, req);
1160 9d3f16da Giannakos Filippos
1161 9d3f16da Giannakos Filippos
        mio->del_pending--;
1162 9d3f16da Giannakos Filippos
        if (req->state & XS_FAILED){
1163 9d3f16da Giannakos Filippos
                mio->err = 1;
1164 40e56a42 Filippos Giannakos
        }
1165 9d3f16da Giannakos Filippos
        signal_mapnode(mn);
1166 9d3f16da Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
1167 9d3f16da Giannakos Filippos
        signal_pr(pr);
1168 9d3f16da Giannakos Filippos
}
1169 feeaaf17 Filippos Giannakos
1170 9d3f16da Giannakos Filippos
void copyup_cb(struct peer_req *pr, struct xseg_request *req)
1171 9d3f16da Giannakos Filippos
{
1172 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1173 9d3f16da Giannakos Filippos
        struct mapperd *mapper = __get_mapperd(peer);
1174 9d3f16da Giannakos Filippos
        (void)mapper;
1175 9d3f16da Giannakos Filippos
        struct mapper_io *mio = __get_mapper_io(pr);
1176 9d3f16da Giannakos Filippos
        struct map_node *mn = __get_copyup_node(mio, req);
1177 9d3f16da Giannakos Filippos
        if (!mn){
1178 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot get map node");
1179 7c130c7d Filippos Giannakos
                goto out_err;
1180 7c130c7d Filippos Giannakos
        }
1181 9d3f16da Giannakos Filippos
        __set_copyup_node(mio, req, NULL);
1182 7c130c7d Filippos Giannakos
1183 9d3f16da Giannakos Filippos
        if (req->state & XS_FAILED){
1184 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Req failed");
1185 9d3f16da Giannakos Filippos
                mn->flags &= ~MF_OBJECT_COPYING;
1186 9d3f16da Giannakos Filippos
                mn->flags &= ~MF_OBJECT_WRITING;
1187 9d3f16da Giannakos Filippos
                goto out_err;
1188 7c130c7d Filippos Giannakos
        }
1189 9d3f16da Giannakos Filippos
        if (req->op == X_WRITE) {
1190 9d3f16da Giannakos Filippos
                char *target = xseg_get_target(peer->xseg, req);
1191 9d3f16da Giannakos Filippos
                (void)target;
1192 9d3f16da Giannakos Filippos
                //printf("handle object write replyi\n");
1193 9d3f16da Giannakos Filippos
                __set_copyup_node(mio, req, NULL);
1194 9d3f16da Giannakos Filippos
                //assert mn->flags & MF_OBJECT_WRITING
1195 9d3f16da Giannakos Filippos
                mn->flags &= ~MF_OBJECT_WRITING;
1196 9d3f16da Giannakos Filippos
                
1197 9d3f16da Giannakos Filippos
                struct map_node tmp;
1198 9d3f16da Giannakos Filippos
                char *data = xseg_get_data(peer->xseg, req);
1199 9d3f16da Giannakos Filippos
                map_to_object(&tmp, data);
1200 9d3f16da Giannakos Filippos
                mn->flags |= MF_OBJECT_EXIST;
1201 9d3f16da Giannakos Filippos
                if (mn->flags != MF_OBJECT_EXIST){
1202 9d3f16da Giannakos Filippos
                        XSEGLOG2(&lc, E, "map node %s has wrong flags", mn->object);
1203 9d3f16da Giannakos Filippos
                        goto out_err;
1204 40e56a42 Filippos Giannakos
                }
1205 9d3f16da Giannakos Filippos
                //assert mn->flags & MF_OBJECT_EXIST
1206 9d3f16da Giannakos Filippos
                strncpy(mn->object, tmp.object, tmp.objectlen);
1207 9d3f16da Giannakos Filippos
                mn->object[tmp.objectlen] = 0;
1208 9d3f16da Giannakos Filippos
                mn->objectlen = tmp.objectlen;
1209 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, I, "Object write of %s completed successfully", mn->object);
1210 9d3f16da Giannakos Filippos
                mio->copyups--;
1211 9d3f16da Giannakos Filippos
                signal_mapnode(mn);
1212 9d3f16da Giannakos Filippos
        } else if (req->op == X_COPY) {
1213 9d3f16da Giannakos Filippos
        //        issue write_object;
1214 9d3f16da Giannakos Filippos
                mn->flags &= ~MF_OBJECT_COPYING;
1215 9d3f16da Giannakos Filippos
                struct map *map = mn->map;
1216 9d3f16da Giannakos Filippos
                if (!map){
1217 9d3f16da Giannakos Filippos
                        XSEGLOG2(&lc, E, "Object %s has not map back pointer", mn->object);
1218 9d3f16da Giannakos Filippos
                        goto out_err;
1219 feeaaf17 Filippos Giannakos
                }
1220 9d3f16da Giannakos Filippos
1221 9d3f16da Giannakos Filippos
                /* construct a tmp map_node for writing purposes */
1222 9d3f16da Giannakos Filippos
                char *target = xseg_get_target(peer->xseg, req);
1223 9d3f16da Giannakos Filippos
                struct map_node newmn = *mn;
1224 9d3f16da Giannakos Filippos
                newmn.flags = MF_OBJECT_EXIST;
1225 9d3f16da Giannakos Filippos
                strncpy(newmn.object, target, req->targetlen);
1226 9d3f16da Giannakos Filippos
                newmn.object[req->targetlen] = 0;
1227 9d3f16da Giannakos Filippos
                newmn.objectlen = req->targetlen;
1228 9d3f16da Giannakos Filippos
                newmn.objectidx = mn->objectidx; 
1229 9d3f16da Giannakos Filippos
                struct xseg_request *xreq = object_write(peer, pr, map, &newmn);
1230 9d3f16da Giannakos Filippos
                if (!xreq){
1231 9d3f16da Giannakos Filippos
                        XSEGLOG2(&lc, E, "Object write returned error for object %s"
1232 9d3f16da Giannakos Filippos
                                        "\n\t of map %s [%llu]",
1233 9d3f16da Giannakos Filippos
                                        mn->object, map->volume, (unsigned long long) mn->objectidx);
1234 9d3f16da Giannakos Filippos
                        goto out_err;
1235 9d3f16da Giannakos Filippos
                }
1236 9d3f16da Giannakos Filippos
                mn->flags |= MF_OBJECT_WRITING;
1237 9d3f16da Giannakos Filippos
                __set_copyup_node (mio, xreq, mn);
1238 9d3f16da Giannakos Filippos
1239 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, I, "Object %s copy up completed. Pending writing.", mn->object);
1240 d85c1e11 Filippos Giannakos
        } else {
1241 9d3f16da Giannakos Filippos
                //wtf??
1242 9d3f16da Giannakos Filippos
                ;
1243 d85c1e11 Filippos Giannakos
        }
1244 7c130c7d Filippos Giannakos
1245 9d3f16da Giannakos Filippos
out:
1246 9d3f16da Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
1247 9d3f16da Giannakos Filippos
        return;
1248 9d3f16da Giannakos Filippos
1249 7c130c7d Filippos Giannakos
out_err:
1250 9d3f16da Giannakos Filippos
        mio->copyups--;
1251 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, D, "Mio->copyups: %u", mio->copyups);
1252 9d3f16da Giannakos Filippos
        mio->err = 1;
1253 9d3f16da Giannakos Filippos
        if (mn)
1254 9d3f16da Giannakos Filippos
                signal_mapnode(mn);
1255 9d3f16da Giannakos Filippos
        goto out;
1256 9d3f16da Giannakos Filippos
1257 7c130c7d Filippos Giannakos
}
1258 7c130c7d Filippos Giannakos
1259 9d3f16da Giannakos Filippos
struct r2o {
1260 9d3f16da Giannakos Filippos
        struct map_node *mn;
1261 9d3f16da Giannakos Filippos
        uint64_t offset;
1262 9d3f16da Giannakos Filippos
        uint64_t size;
1263 9d3f16da Giannakos Filippos
};
1264 9d3f16da Giannakos Filippos
1265 9d3f16da Giannakos Filippos
static int req2objs(struct peer_req *pr, struct map *map, int write)
1266 7c130c7d Filippos Giannakos
{
1267 9d3f16da Giannakos Filippos
        int r = 0;
1268 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1269 9d3f16da Giannakos Filippos
        struct mapper_io *mio = __get_mapper_io(pr);
1270 7c130c7d Filippos Giannakos
        char *target = xseg_get_target(peer->xseg, pr->req);
1271 7c130c7d Filippos Giannakos
        uint32_t nr_objs = calc_nr_obj(pr->req);
1272 7c130c7d Filippos Giannakos
        uint64_t size = sizeof(struct xseg_reply_map) + 
1273 7c130c7d Filippos Giannakos
                        nr_objs * sizeof(struct xseg_reply_map_scatterlist);
1274 9d3f16da Giannakos Filippos
        uint32_t idx, i, ready;
1275 9d3f16da Giannakos Filippos
        uint64_t rem_size, obj_index, obj_offset, obj_size; 
1276 9d3f16da Giannakos Filippos
        struct map_node *mn;
1277 9d3f16da Giannakos Filippos
        mio->copyups = 0;
1278 6d417535 Giannakos Filippos
        XSEGLOG2(&lc, D, "Calculated %u nr_objs", nr_objs);
1279 9d3f16da Giannakos Filippos
1280 9d3f16da Giannakos Filippos
        /* get map_nodes of request */
1281 9d3f16da Giannakos Filippos
        struct r2o *mns = malloc(sizeof(struct r2o)*nr_objs);
1282 9d3f16da Giannakos Filippos
        if (!mns){
1283 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot allocate mns");
1284 7c130c7d Filippos Giannakos
                return -1;
1285 7c130c7d Filippos Giannakos
        }
1286 9d3f16da Giannakos Filippos
        idx = 0;
1287 9d3f16da Giannakos Filippos
        rem_size = pr->req->size;
1288 9d3f16da Giannakos Filippos
        obj_index = pr->req->offset / block_size;
1289 9d3f16da Giannakos Filippos
        obj_offset = pr->req->offset & (block_size -1); //modulo
1290 9d3f16da Giannakos Filippos
        obj_size =  (obj_offset + rem_size > block_size) ? block_size - obj_offset : rem_size;
1291 9d3f16da Giannakos Filippos
        mn = get_mapnode(map, obj_index);
1292 7c130c7d Filippos Giannakos
        if (!mn) {
1293 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1294 9d3f16da Giannakos Filippos
                r = -1;
1295 9d3f16da Giannakos Filippos
                goto out;
1296 7c130c7d Filippos Giannakos
        }
1297 9d3f16da Giannakos Filippos
        mns[idx].mn = mn;
1298 9d3f16da Giannakos Filippos
        mns[idx].offset = obj_offset;
1299 9d3f16da Giannakos Filippos
        mns[idx].size = obj_size;
1300 7c130c7d Filippos Giannakos
        rem_size -= obj_size;
1301 7c130c7d Filippos Giannakos
        while (rem_size > 0) {
1302 7c130c7d Filippos Giannakos
                idx++;
1303 7c130c7d Filippos Giannakos
                obj_index++;
1304 7c130c7d Filippos Giannakos
                obj_offset = 0;
1305 d85c1e11 Filippos Giannakos
                obj_size = (rem_size >  block_size) ? block_size : rem_size;
1306 7c130c7d Filippos Giannakos
                rem_size -= obj_size;
1307 9d3f16da Giannakos Filippos
                mn = get_mapnode(map, obj_index);
1308 7c130c7d Filippos Giannakos
                if (!mn) {
1309 feeaaf17 Filippos Giannakos
                        XSEGLOG2(&lc, E, "Cannot find obj_index %llu\n", (unsigned long long) obj_index);
1310 9d3f16da Giannakos Filippos
                        r = -1;
1311 9d3f16da Giannakos Filippos
                        goto out;
1312 7c130c7d Filippos Giannakos
                }
1313 9d3f16da Giannakos Filippos
                mns[idx].mn = mn;
1314 9d3f16da Giannakos Filippos
                mns[idx].offset = obj_offset;
1315 9d3f16da Giannakos Filippos
                mns[idx].size = obj_size;
1316 9d3f16da Giannakos Filippos
        }
1317 9d3f16da Giannakos Filippos
        if (write) {
1318 9d3f16da Giannakos Filippos
                ready = 0;
1319 9d3f16da Giannakos Filippos
                int can_wait = 0;
1320 9d3f16da Giannakos Filippos
                mio->cb=copyup_cb;
1321 9d3f16da Giannakos Filippos
                while (ready < (idx + 1)){
1322 9d3f16da Giannakos Filippos
                        ready = 0;
1323 9d3f16da Giannakos Filippos
                        for (i = 0; i < (idx+1); i++) {
1324 9d3f16da Giannakos Filippos
                                mn = mns[i].mn;
1325 9d3f16da Giannakos Filippos
                                //do copyups
1326 9d3f16da Giannakos Filippos
                                if (mn->flags & MF_OBJECT_NOT_READY) {
1327 9d3f16da Giannakos Filippos
                                        if (can_wait){
1328 9d3f16da Giannakos Filippos
                                                wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1329 9d3f16da Giannakos Filippos
                                                if (mn->flags & MF_OBJECT_DELETED){
1330 9d3f16da Giannakos Filippos
                                                        mio->err = 1;
1331 9d3f16da Giannakos Filippos
                                                }
1332 9d3f16da Giannakos Filippos
                                                if (mio->err){
1333 2979e589 Filippos Giannakos
                                                        XSEGLOG2(&lc, E, "Mio-err, pending_copyups: %d", mio->copyups);
1334 9d3f16da Giannakos Filippos
                                                        if (!mio->copyups){
1335 9d3f16da Giannakos Filippos
                                                                r = -1;
1336 9d3f16da Giannakos Filippos
                                                                goto out;
1337 9d3f16da Giannakos Filippos
                                                        }
1338 9d3f16da Giannakos Filippos
                                                }
1339 9d3f16da Giannakos Filippos
                                        }
1340 9d3f16da Giannakos Filippos
                                }
1341 9d3f16da Giannakos Filippos
                                else if (!(mn->flags & MF_OBJECT_EXIST)) {
1342 9d3f16da Giannakos Filippos
                                        //calc new_target, copy up object
1343 9d3f16da Giannakos Filippos
                                        if (copyup_object(peer, mn, pr) == NULL){
1344 9d3f16da Giannakos Filippos
                                                XSEGLOG2(&lc, E, "Error in copy up object");
1345 9d3f16da Giannakos Filippos
                                        } else {
1346 9d3f16da Giannakos Filippos
                                                mio->copyups++;
1347 9d3f16da Giannakos Filippos
                                        }
1348 9d3f16da Giannakos Filippos
                                } else {
1349 9d3f16da Giannakos Filippos
                                        ready++;
1350 9d3f16da Giannakos Filippos
                                }
1351 40e56a42 Filippos Giannakos
                        }
1352 9d3f16da Giannakos Filippos
                        can_wait = 1;
1353 7c130c7d Filippos Giannakos
                }
1354 9d3f16da Giannakos Filippos
                /*
1355 9d3f16da Giannakos Filippos
pending_copyups:
1356 9d3f16da Giannakos Filippos
                while(mio->copyups > 0){
1357 9d3f16da Giannakos Filippos
                        mio->cb = copyup_cb;
1358 9d3f16da Giannakos Filippos
                        wait_on_pr(pr, 0);
1359 9d3f16da Giannakos Filippos
                        ta--;
1360 9d3f16da Giannakos Filippos
                        st_cond_wait(pr->cond);
1361 9d3f16da Giannakos Filippos
                }
1362 9d3f16da Giannakos Filippos
                */
1363 7c130c7d Filippos Giannakos
        }
1364 7c130c7d Filippos Giannakos
1365 9d3f16da Giannakos Filippos
        if (mio->err){
1366 9d3f16da Giannakos Filippos
                r = -1;
1367 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Mio->err");
1368 9d3f16da Giannakos Filippos
                goto out;
1369 9d3f16da Giannakos Filippos
        }
1370 7c130c7d Filippos Giannakos
1371 9d3f16da Giannakos Filippos
        /* resize request to fit reply */
1372 9d3f16da Giannakos Filippos
        char buf[XSEG_MAX_TARGETLEN];
1373 9d3f16da Giannakos Filippos
        strncpy(buf, target, pr->req->targetlen);
1374 9d3f16da Giannakos Filippos
        r = xseg_resize_request(peer->xseg, pr->req, pr->req->targetlen, size);
1375 7c130c7d Filippos Giannakos
        if (r < 0) {
1376 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot resize request");
1377 9d3f16da Giannakos Filippos
                goto out;
1378 7c130c7d Filippos Giannakos
        }
1379 9d3f16da Giannakos Filippos
        target = xseg_get_target(peer->xseg, pr->req);
1380 9d3f16da Giannakos Filippos
        strncpy(target, buf, pr->req->targetlen);
1381 9d3f16da Giannakos Filippos
1382 9d3f16da Giannakos Filippos
        /* structure reply */
1383 9d3f16da Giannakos Filippos
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1384 9d3f16da Giannakos Filippos
        reply->cnt = nr_objs;
1385 9d3f16da Giannakos Filippos
        for (i = 0; i < (idx+1); i++) {
1386 9d3f16da Giannakos Filippos
                strncpy(reply->segs[i].target, mns[i].mn->object, mns[i].mn->objectlen);
1387 9d3f16da Giannakos Filippos
                reply->segs[i].targetlen = mns[i].mn->objectlen;
1388 9d3f16da Giannakos Filippos
                reply->segs[i].offset = mns[i].offset;
1389 9d3f16da Giannakos Filippos
                reply->segs[i].size = mns[i].size;
1390 40e56a42 Filippos Giannakos
        }
1391 9d3f16da Giannakos Filippos
out:
1392 9d3f16da Giannakos Filippos
        for (i = 0; i < idx; i++) {
1393 9d3f16da Giannakos Filippos
                put_mapnode(mns[i].mn);
1394 40e56a42 Filippos Giannakos
        }
1395 9d3f16da Giannakos Filippos
        free(mns);
1396 9d3f16da Giannakos Filippos
        return r;
1397 7c130c7d Filippos Giannakos
}
1398 7c130c7d Filippos Giannakos
1399 9d3f16da Giannakos Filippos
static int do_dropcache(struct peer_req *pr, struct map *map)
1400 7c130c7d Filippos Giannakos
{
1401 9d3f16da Giannakos Filippos
        struct map_node *mn;
1402 56ab0f5d Giannakos Filippos
        struct peerd *peer = pr->peer;
1403 56ab0f5d Giannakos Filippos
        struct mapperd *mapper = __get_mapperd(peer);
1404 9d3f16da Giannakos Filippos
        uint64_t i;
1405 99dad167 Giannakos Filippos
        XSEGLOG2(&lc, I, "Dropping cache for map %s", map->volume);
1406 9d3f16da Giannakos Filippos
        map->flags |= MF_MAP_DROPPING_CACHE;
1407 9d3f16da Giannakos Filippos
        for (i = 0; i < calc_map_obj(map); i++) {
1408 9d3f16da Giannakos Filippos
                mn = get_mapnode(map, i);
1409 9d3f16da Giannakos Filippos
                if (mn) {
1410 9d3f16da Giannakos Filippos
                        if (!(mn->flags & MF_OBJECT_DESTROYED)){
1411 9d3f16da Giannakos Filippos
                                //make sure all pending operations on all objects are completed
1412 9d3f16da Giannakos Filippos
                                if (mn->flags & MF_OBJECT_NOT_READY){
1413 9d3f16da Giannakos Filippos
                                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1414 9d3f16da Giannakos Filippos
                                }
1415 9d3f16da Giannakos Filippos
                                mn->flags &= MF_OBJECT_DESTROYED;
1416 9d3f16da Giannakos Filippos
                        }
1417 9d3f16da Giannakos Filippos
                        put_mapnode(mn);
1418 9d3f16da Giannakos Filippos
                }
1419 7c130c7d Filippos Giannakos
        }
1420 9d3f16da Giannakos Filippos
        map->flags &= ~MF_MAP_DROPPING_CACHE;
1421 9d3f16da Giannakos Filippos
        map->flags |= MF_MAP_DESTROYED;
1422 56ab0f5d Giannakos Filippos
        remove_map(mapper, map);
1423 99dad167 Giannakos Filippos
        XSEGLOG2(&lc, I, "Dropping cache for map %s completed", map->volume);
1424 9d3f16da Giannakos Filippos
        put_map(map);        // put map here to destroy it (matches m->ref = 1 on map create)
1425 42a0461c Giannakos Filippos
        return 0;
1426 d85c1e11 Filippos Giannakos
}
1427 d85c1e11 Filippos Giannakos
1428 9d3f16da Giannakos Filippos
static int do_info(struct peer_req *pr, struct map *map)
1429 d85c1e11 Filippos Giannakos
{
1430 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1431 9d3f16da Giannakos Filippos
        struct xseg_reply_info *xinfo = (struct xseg_reply_info *) xseg_get_data(peer->xseg, pr->req);
1432 9d3f16da Giannakos Filippos
        xinfo->size = map->size;
1433 d85c1e11 Filippos Giannakos
        return 0;
1434 9d3f16da Giannakos Filippos
}
1435 7c130c7d Filippos Giannakos
1436 d85c1e11 Filippos Giannakos
1437 9d3f16da Giannakos Filippos
static int do_close(struct peer_req *pr, struct map *map)
1438 9d3f16da Giannakos Filippos
{
1439 9d3f16da Giannakos Filippos
//        struct peerd *peer = pr->peer;
1440 9d3f16da Giannakos Filippos
//        struct xseg_request *req;
1441 9d3f16da Giannakos Filippos
        if (map->flags & MF_MAP_EXCLUSIVE) 
1442 9d3f16da Giannakos Filippos
                close_map(pr, map);
1443 9d3f16da Giannakos Filippos
        return do_dropcache(pr, map);
1444 7c130c7d Filippos Giannakos
}
1445 7c130c7d Filippos Giannakos
1446 9d3f16da Giannakos Filippos
static int do_destroy(struct peer_req *pr, struct map *map)
1447 7c130c7d Filippos Giannakos
{
1448 9d3f16da Giannakos Filippos
        uint64_t i;
1449 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1450 7c130c7d Filippos Giannakos
        struct mapper_io *mio = __get_mapper_io(pr);
1451 9d3f16da Giannakos Filippos
        struct map_node *mn;
1452 9d3f16da Giannakos Filippos
        struct xseg_request *req;
1453 99dad167 Giannakos Filippos
        
1454 99dad167 Giannakos Filippos
        XSEGLOG2(&lc, I, "Destroying map %s", map->volume);
1455 9d3f16da Giannakos Filippos
        map->flags |= MF_MAP_DELETING;
1456 9d3f16da Giannakos Filippos
        req = delete_map(pr, map);
1457 9d3f16da Giannakos Filippos
        if (!req)
1458 9d3f16da Giannakos Filippos
                return -1;
1459 9d3f16da Giannakos Filippos
        wait_on_pr(pr, (!((req->state & XS_FAILED)||(req->state & XS_SERVED))));
1460 9d3f16da Giannakos Filippos
        if (req->state & XS_FAILED){
1461 9d3f16da Giannakos Filippos
                xseg_put_request(peer->xseg, req, pr->portno);
1462 9d3f16da Giannakos Filippos
                map->flags &= ~MF_MAP_DELETING;
1463 7c130c7d Filippos Giannakos
                return -1;
1464 7c130c7d Filippos Giannakos
        }
1465 9d3f16da Giannakos Filippos
        xseg_put_request(peer->xseg, req, pr->portno);
1466 9d3f16da Giannakos Filippos
        //FIXME
1467 9d3f16da Giannakos Filippos
        uint64_t nr_obj = calc_map_obj(map);
1468 9d3f16da Giannakos Filippos
        uint64_t deleted = 0;
1469 9d3f16da Giannakos Filippos
        while (deleted < nr_obj){ 
1470 9d3f16da Giannakos Filippos
                deleted = 0;
1471 56ab0f5d Giannakos Filippos
                for (i = 0; i < nr_obj; i++){
1472 9d3f16da Giannakos Filippos
                        mn = get_mapnode(map, i);
1473 9d3f16da Giannakos Filippos
                        if (mn) {
1474 9d3f16da Giannakos Filippos
                                if (!(mn->flags & MF_OBJECT_DESTROYED)){
1475 9d3f16da Giannakos Filippos
                                        if (mn->flags & MF_OBJECT_EXIST){
1476 9d3f16da Giannakos Filippos
                                                //make sure all pending operations on all objects are completed
1477 9d3f16da Giannakos Filippos
                                                if (mn->flags & MF_OBJECT_NOT_READY){
1478 9d3f16da Giannakos Filippos
                                                        wait_on_mapnode(mn, mn->flags & MF_OBJECT_NOT_READY);
1479 9d3f16da Giannakos Filippos
                                                }
1480 9d3f16da Giannakos Filippos
                                                req = delete_object(pr, mn);
1481 9d3f16da Giannakos Filippos
                                                if (!req)
1482 9d3f16da Giannakos Filippos
                                                        if (mio->del_pending){
1483 9d3f16da Giannakos Filippos
                                                                goto wait_pending;
1484 9d3f16da Giannakos Filippos
                                                        } else {
1485 9d3f16da Giannakos Filippos
                                                                continue;
1486 9d3f16da Giannakos Filippos
                                                        }
1487 9d3f16da Giannakos Filippos
                                                else {
1488 9d3f16da Giannakos Filippos
                                                        mio->del_pending++;
1489 9d3f16da Giannakos Filippos
                                                }
1490 9d3f16da Giannakos Filippos
                                        }
1491 9d3f16da Giannakos Filippos
                                        mn->flags &= MF_OBJECT_DESTROYED;
1492 9d3f16da Giannakos Filippos
                                }
1493 9d3f16da Giannakos Filippos
                                put_mapnode(mn);
1494 9d3f16da Giannakos Filippos
                        }
1495 9d3f16da Giannakos Filippos
                        deleted++;
1496 9d3f16da Giannakos Filippos
                }
1497 9d3f16da Giannakos Filippos
wait_pending:
1498 9d3f16da Giannakos Filippos
                mio->cb = deletion_cb;
1499 9d3f16da Giannakos Filippos
                wait_on_pr(pr, mio->del_pending > 0);
1500 40e56a42 Filippos Giannakos
        }
1501 9d3f16da Giannakos Filippos
        mio->cb = NULL;
1502 9d3f16da Giannakos Filippos
        map->flags &= ~MF_MAP_DELETING;
1503 99dad167 Giannakos Filippos
        XSEGLOG2(&lc, I, "Destroyed map %s", map->volume);
1504 9d3f16da Giannakos Filippos
        return do_close(pr, map);
1505 9d3f16da Giannakos Filippos
}
1506 7c130c7d Filippos Giannakos
1507 9d3f16da Giannakos Filippos
static int do_mapr(struct peer_req *pr, struct map *map)
1508 9d3f16da Giannakos Filippos
{
1509 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1510 9d3f16da Giannakos Filippos
        int r = req2objs(pr, map, 0);
1511 9d3f16da Giannakos Filippos
        if  (r < 0){
1512 feeaaf17 Filippos Giannakos
                XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu failed",
1513 feeaaf17 Filippos Giannakos
                                map->volume, 
1514 feeaaf17 Filippos Giannakos
                                (unsigned long long) pr->req->offset, 
1515 feeaaf17 Filippos Giannakos
                                (unsigned long long) (pr->req->offset + pr->req->size));
1516 9d3f16da Giannakos Filippos
                return -1;
1517 40e56a42 Filippos Giannakos
        }
1518 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, I, "Map r of map %s, range: %llu-%llu completed",
1519 9d3f16da Giannakos Filippos
                        map->volume, 
1520 9d3f16da Giannakos Filippos
                        (unsigned long long) pr->req->offset, 
1521 9d3f16da Giannakos Filippos
                        (unsigned long long) (pr->req->offset + pr->req->size));
1522 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1523 9d3f16da Giannakos Filippos
                        (unsigned long long) pr->req->offset,
1524 9d3f16da Giannakos Filippos
                        (unsigned long long) pr->req->size);
1525 9d3f16da Giannakos Filippos
        char buf[XSEG_MAX_TARGETLEN+1];
1526 9d3f16da Giannakos Filippos
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1527 9d3f16da Giannakos Filippos
        int i;
1528 9d3f16da Giannakos Filippos
        for (i = 0; i < reply->cnt; i++) {
1529 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1530 9d3f16da Giannakos Filippos
                strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1531 9d3f16da Giannakos Filippos
                buf[reply->segs[i].targetlen] = 0;
1532 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1533 9d3f16da Giannakos Filippos
                                (unsigned long long) reply->segs[i].offset,
1534 9d3f16da Giannakos Filippos
                                (unsigned long long) reply->segs[i].size);
1535 feeaaf17 Filippos Giannakos
        }
1536 7c130c7d Filippos Giannakos
        return 0;
1537 7c130c7d Filippos Giannakos
}
1538 7c130c7d Filippos Giannakos
1539 9d3f16da Giannakos Filippos
static int do_mapw(struct peer_req *pr, struct map *map)
1540 7c130c7d Filippos Giannakos
{
1541 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1542 9d3f16da Giannakos Filippos
        int r = req2objs(pr, map, 1);
1543 9d3f16da Giannakos Filippos
        if  (r < 0){
1544 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu failed",
1545 9d3f16da Giannakos Filippos
                                map->volume, 
1546 9d3f16da Giannakos Filippos
                                (unsigned long long) pr->req->offset, 
1547 9d3f16da Giannakos Filippos
                                (unsigned long long) (pr->req->offset + pr->req->size));
1548 7c130c7d Filippos Giannakos
                return -1;
1549 7c130c7d Filippos Giannakos
        }
1550 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, I, "Map w of map %s, range: %llu-%llu completed",
1551 9d3f16da Giannakos Filippos
                        map->volume, 
1552 9d3f16da Giannakos Filippos
                        (unsigned long long) pr->req->offset, 
1553 9d3f16da Giannakos Filippos
                        (unsigned long long) (pr->req->offset + pr->req->size));
1554 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, D, "Req->offset: %llu, req->size: %llu",
1555 9d3f16da Giannakos Filippos
                        (unsigned long long) pr->req->offset,
1556 9d3f16da Giannakos Filippos
                        (unsigned long long) pr->req->size);
1557 9d3f16da Giannakos Filippos
        char buf[XSEG_MAX_TARGETLEN+1];
1558 9d3f16da Giannakos Filippos
        struct xseg_reply_map *reply = (struct xseg_reply_map *) xseg_get_data(peer->xseg, pr->req);
1559 9d3f16da Giannakos Filippos
        int i;
1560 9d3f16da Giannakos Filippos
        for (i = 0; i < reply->cnt; i++) {
1561 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, D, "i: %d, reply->cnt: %u",i, reply->cnt);
1562 9d3f16da Giannakos Filippos
                strncpy(buf, reply->segs[i].target, reply->segs[i].targetlen);
1563 9d3f16da Giannakos Filippos
                buf[reply->segs[i].targetlen] = 0;
1564 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, D, "%d: Object: %s, offset: %llu, size: %llu", i, buf,
1565 9d3f16da Giannakos Filippos
                                (unsigned long long) reply->segs[i].offset,
1566 9d3f16da Giannakos Filippos
                                (unsigned long long) reply->segs[i].size);
1567 7c130c7d Filippos Giannakos
        }
1568 7c130c7d Filippos Giannakos
        return 0;
1569 7c130c7d Filippos Giannakos
}
1570 7c130c7d Filippos Giannakos
1571 9d3f16da Giannakos Filippos
//here map is the parent map
1572 9d3f16da Giannakos Filippos
static int do_clone(struct peer_req *pr, struct map *map)
1573 c04d4786 Filippos Giannakos
{
1574 9d3f16da Giannakos Filippos
        /*
1575 9d3f16da Giannakos Filippos
        FIXME check if clone map exists
1576 9d3f16da Giannakos Filippos
        clonemap = get_map(pr, target, targetlen, MF_LOAD);
1577 9d3f16da Giannakos Filippos
        if (clonemap)
1578 9d3f16da Giannakos Filippos
                do_dropcache(pr, clonemap); // drop map here, rely on get_map_function to drop
1579 9d3f16da Giannakos Filippos
                                        //        cache on non-exclusive opens or declare a NO_CACHE flag ?
1580 9d3f16da Giannakos Filippos
                return -1;
1581 9d3f16da Giannakos Filippos
        */
1582 9d3f16da Giannakos Filippos
1583 9d3f16da Giannakos Filippos
        int r;
1584 99dad167 Giannakos Filippos
        char buf[XSEG_MAX_TARGETLEN];
1585 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1586 c04d4786 Filippos Giannakos
        struct mapperd *mapper = __get_mapperd(peer);
1587 9d3f16da Giannakos Filippos
        char *target = xseg_get_target(peer->xseg, pr->req);
1588 9d3f16da Giannakos Filippos
        struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1589 99dad167 Giannakos Filippos
        XSEGLOG2(&lc, I, "Cloning map %s", map->volume);
1590 9d3f16da Giannakos Filippos
        struct map *clonemap = create_map(mapper, target, pr->req->targetlen);
1591 9d3f16da Giannakos Filippos
        if (!clonemap) 
1592 9d3f16da Giannakos Filippos
                return -1;
1593 c04d4786 Filippos Giannakos
1594 9d3f16da Giannakos Filippos
        if (xclone->size == -1)
1595 9d3f16da Giannakos Filippos
                clonemap->size = map->size;
1596 9d3f16da Giannakos Filippos
        else
1597 9d3f16da Giannakos Filippos
                clonemap->size = xclone->size;
1598 9d3f16da Giannakos Filippos
        if (clonemap->size < map->size){
1599 9d3f16da Giannakos Filippos
                target = xseg_get_target(peer->xseg, pr->req);
1600 99dad167 Giannakos Filippos
                strncpy(buf, target, pr->req->targetlen);
1601 99dad167 Giannakos Filippos
                buf[pr->req->targetlen] = 0;
1602 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, W, "Requested clone size (%llu) < map size (%llu)"
1603 9d3f16da Giannakos Filippos
                                "\n\t for requested clone %s",
1604 9d3f16da Giannakos Filippos
                                (unsigned long long) xclone->size,
1605 9d3f16da Giannakos Filippos
                                (unsigned long long) map->size, buf);
1606 9d3f16da Giannakos Filippos
                goto out_err;
1607 c04d4786 Filippos Giannakos
        }
1608 9d3f16da Giannakos Filippos
        
1609 9d3f16da Giannakos Filippos
        //alloc and init map_nodes
1610 9d3f16da Giannakos Filippos
        unsigned long c = clonemap->size/block_size + 1;
1611 9d3f16da Giannakos Filippos
        struct map_node *map_nodes = calloc(c, sizeof(struct map_node));
1612 9d3f16da Giannakos Filippos
        if (!map_nodes){
1613 c04d4786 Filippos Giannakos
                goto out_err;
1614 9d3f16da Giannakos Filippos
        }
1615 9d3f16da Giannakos Filippos
        int i;
1616 9d3f16da Giannakos Filippos
        for (i = 0; i < clonemap->size/block_size + 1; i++) {
1617 9d3f16da Giannakos Filippos
                struct map_node *mn = get_mapnode(map, i);
1618 9d3f16da Giannakos Filippos
                if (mn) {
1619 9d3f16da Giannakos Filippos
                        strncpy(map_nodes[i].object, mn->object, mn->objectlen);
1620 9d3f16da Giannakos Filippos
                        map_nodes[i].objectlen = mn->objectlen;
1621 9d3f16da Giannakos Filippos
                        put_mapnode(mn);
1622 9d3f16da Giannakos Filippos
                } else {
1623 9d3f16da Giannakos Filippos
                        strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
1624 9d3f16da Giannakos Filippos
                        map_nodes[i].objectlen = strlen(zero_block);
1625 9d3f16da Giannakos Filippos
                }
1626 9d3f16da Giannakos Filippos
                map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1627 9d3f16da Giannakos Filippos
                map_nodes[i].flags = 0;
1628 9d3f16da Giannakos Filippos
                map_nodes[i].objectidx = i;
1629 9d3f16da Giannakos Filippos
                map_nodes[i].map = clonemap;
1630 9d3f16da Giannakos Filippos
                map_nodes[i].ref = 1;
1631 9d3f16da Giannakos Filippos
                map_nodes[i].waiters = 0;
1632 9d3f16da Giannakos Filippos
                map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1633 9d3f16da Giannakos Filippos
                r = insert_object(clonemap, &map_nodes[i]);
1634 9d3f16da Giannakos Filippos
                if (r < 0){
1635 99dad167 Giannakos Filippos
                        XSEGLOG2(&lc, E, "Cannot insert object %d to map %s", i, clonemap->volume);
1636 9d3f16da Giannakos Filippos
                        goto out_err;
1637 9d3f16da Giannakos Filippos
                }
1638 9d3f16da Giannakos Filippos
        }
1639 9d3f16da Giannakos Filippos
        r = write_map(pr, clonemap);
1640 9d3f16da Giannakos Filippos
        if (r < 0){
1641 9d3f16da Giannakos Filippos
                XSEGLOG2(&lc, E, "Cannot write map %s", clonemap->volume);
1642 9d3f16da Giannakos Filippos
                goto out_err;
1643 9d3f16da Giannakos Filippos
        }
1644 9d3f16da Giannakos Filippos
        return 0;
1645 c04d4786 Filippos Giannakos
1646 c04d4786 Filippos Giannakos
out_err:
1647 9d3f16da Giannakos Filippos
        put_map(clonemap);
1648 c04d4786 Filippos Giannakos
        return -1;
1649 c04d4786 Filippos Giannakos
}
1650 b7c43e7b Giannakos Filippos
1651 9d3f16da Giannakos Filippos
static int open_load_map(struct peer_req *pr, struct map *map, uint32_t flags)
1652 b7c43e7b Giannakos Filippos
{
1653 9d3f16da Giannakos Filippos
        int r, opened = 0;
1654 56ab0f5d Giannakos Filippos
retry_open:
1655 9d3f16da Giannakos Filippos
        if (flags & MF_EXCLUSIVE){
1656 9d3f16da Giannakos Filippos
                r = open_map(pr, map);
1657 9d3f16da Giannakos Filippos
                if (r < 0) {
1658 2979e589 Filippos Giannakos
                        if (flags & MF_FORCE){
1659 2979e589 Filippos Giannakos
1660 56ab0f5d Giannakos Filippos
                                goto retry_open;
1661 2979e589 Filippos Giannakos
                        }
1662 b7c43e7b Giannakos Filippos
                } else {
1663 9d3f16da Giannakos Filippos
                        opened = 1;
1664 b7c43e7b Giannakos Filippos
                }
1665 b7c43e7b Giannakos Filippos
        }
1666 9d3f16da Giannakos Filippos
        r = load_map(pr, map);
1667 9d3f16da Giannakos Filippos
        if (r < 0 && opened){
1668 9d3f16da Giannakos Filippos
                close_map(pr, map);
1669 9d3f16da Giannakos Filippos
        }
1670 9d3f16da Giannakos Filippos
        return r;
1671 b7c43e7b Giannakos Filippos
}
1672 b7c43e7b Giannakos Filippos
1673 9d3f16da Giannakos Filippos
struct map * get_map(struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1674 c04d4786 Filippos Giannakos
{
1675 f60debc1 Giannakos Filippos
        int r;
1676 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1677 9d3f16da Giannakos Filippos
        struct mapperd *mapper = __get_mapperd(peer);
1678 9d3f16da Giannakos Filippos
        struct map *map = find_map(mapper, name, namelen);
1679 d0e9361a Giannakos Filippos
        if (!map){
1680 d0e9361a Giannakos Filippos
                if (flags & MF_LOAD){
1681 d0e9361a Giannakos Filippos
                        map = create_map(mapper, name, namelen);
1682 bfdf2b2c Giannakos Filippos
                        if (!map)
1683 bfdf2b2c Giannakos Filippos
                                return NULL;
1684 d0e9361a Giannakos Filippos
                        r = open_load_map(pr, map, flags);
1685 d0e9361a Giannakos Filippos
                        if (r < 0){
1686 d0e9361a Giannakos Filippos
                                do_dropcache(pr, map);
1687 d0e9361a Giannakos Filippos
                                return NULL;
1688 d0e9361a Giannakos Filippos
                        }
1689 d0e9361a Giannakos Filippos
                } else {
1690 9d3f16da Giannakos Filippos
                        return NULL;
1691 c04d4786 Filippos Giannakos
                }
1692 9d3f16da Giannakos Filippos
        } else if (map->flags & MF_MAP_DESTROYED){
1693 9d3f16da Giannakos Filippos
                return NULL;
1694 c04d4786 Filippos Giannakos
        }
1695 9d3f16da Giannakos Filippos
        __get_map(map);
1696 9d3f16da Giannakos Filippos
        return map;
1697 9d3f16da Giannakos Filippos
1698 c04d4786 Filippos Giannakos
}
1699 c04d4786 Filippos Giannakos
1700 2979e589 Filippos Giannakos
static int map_action(int (action)(struct peer_req *pr, struct map *map),
1701 9d3f16da Giannakos Filippos
                struct peer_req *pr, char *name, uint32_t namelen, uint32_t flags)
1702 c04d4786 Filippos Giannakos
{
1703 9d3f16da Giannakos Filippos
        //struct peerd *peer = pr->peer;
1704 9d3f16da Giannakos Filippos
        struct map *map;
1705 9d3f16da Giannakos Filippos
start:
1706 9d3f16da Giannakos Filippos
        map = get_map(pr, name, namelen, flags);
1707 9d3f16da Giannakos Filippos
        if (!map)
1708 9d3f16da Giannakos Filippos
                return -1;
1709 9d3f16da Giannakos Filippos
        if (map->flags & MF_MAP_NOT_READY){
1710 9d3f16da Giannakos Filippos
                wait_on_map(map, (map->flags & MF_MAP_NOT_READY));
1711 9d3f16da Giannakos Filippos
                put_map(map);
1712 9d3f16da Giannakos Filippos
                goto start;
1713 9d3f16da Giannakos Filippos
        }
1714 9d3f16da Giannakos Filippos
        int r = action(pr, map);
1715 9d3f16da Giannakos Filippos
        //always drop cache if map not read exclusively
1716 9d3f16da Giannakos Filippos
        if (!(map->flags & MF_MAP_EXCLUSIVE))
1717 9d3f16da Giannakos Filippos
                do_dropcache(pr, map);
1718 9d3f16da Giannakos Filippos
        //maybe capture ref before and compare here?
1719 9d3f16da Giannakos Filippos
        if (map->ref > 1){
1720 9d3f16da Giannakos Filippos
                signal_map(map);
1721 9d3f16da Giannakos Filippos
        }
1722 9d3f16da Giannakos Filippos
        put_map(map);
1723 9d3f16da Giannakos Filippos
        return r;
1724 9d3f16da Giannakos Filippos
}
1725 c04d4786 Filippos Giannakos
1726 9d3f16da Giannakos Filippos
void * handle_info(struct peer_req *pr)
1727 9d3f16da Giannakos Filippos
{
1728 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1729 9d3f16da Giannakos Filippos
        char *target = xseg_get_target(peer->xseg, pr->req);
1730 9d3f16da Giannakos Filippos
        int r = map_action(do_info, pr, target, pr->req->targetlen, MF_LOAD);
1731 c04d4786 Filippos Giannakos
        if (r < 0)
1732 9d3f16da Giannakos Filippos
                fail(peer, pr);
1733 9d3f16da Giannakos Filippos
        else
1734 9d3f16da Giannakos Filippos
                complete(peer, pr);
1735 9d3f16da Giannakos Filippos
        ta--;
1736 9d3f16da Giannakos Filippos
        return NULL;
1737 c04d4786 Filippos Giannakos
}
1738 c04d4786 Filippos Giannakos
1739 9d3f16da Giannakos Filippos
void * handle_clone(struct peer_req *pr)
1740 c04d4786 Filippos Giannakos
{
1741 f4b85f41 Giannakos Filippos
        int r;
1742 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1743 9d3f16da Giannakos Filippos
        struct xseg_request_clone *xclone = (struct xseg_request_clone *) xseg_get_data(peer->xseg, pr->req);
1744 9d3f16da Giannakos Filippos
        if (!xclone) {
1745 9d3f16da Giannakos Filippos
                r = -1;
1746 9d3f16da Giannakos Filippos
                goto out;
1747 9d3f16da Giannakos Filippos
        }
1748 9d3f16da Giannakos Filippos
        if (xclone->targetlen){
1749 9d3f16da Giannakos Filippos
                r = map_action(do_clone, pr, xclone->target, xclone->targetlen, MF_LOAD);
1750 c04d4786 Filippos Giannakos
        } else {
1751 efb7bdb0 Giannakos Filippos
                if (!xclone->size){
1752 9d3f16da Giannakos Filippos
                        r = -1;
1753 9d3f16da Giannakos Filippos
                } else {
1754 126c0407 Giannakos Filippos
                        struct map *map;
1755 126c0407 Giannakos Filippos
                        char *target = xseg_get_target(peer->xseg, pr->req);
1756 126c0407 Giannakos Filippos
                        XSEGLOG2(&lc, I, "Creating volume");
1757 126c0407 Giannakos Filippos
                        map = get_map(pr, target, pr->req->targetlen, MF_LOAD);
1758 126c0407 Giannakos Filippos
                        if (map){
1759 126c0407 Giannakos Filippos
                                XSEGLOG2(&lc, E, "Volume %s exists", map->volume);
1760 4fb9bce5 Giannakos Filippos
                                if (map->ref <= 2) //initial one + one ref from __get_map
1761 4fb9bce5 Giannakos Filippos
                                        do_dropcache(pr, map); //we are the only ones usining this map. Drop the cache. 
1762 4fb9bce5 Giannakos Filippos
                                put_map(map); //matches get_map
1763 126c0407 Giannakos Filippos
                                r = -1;
1764 126c0407 Giannakos Filippos
                                goto out;
1765 126c0407 Giannakos Filippos
                        }
1766 9d3f16da Giannakos Filippos
                        //create a new empty map of size
1767 126c0407 Giannakos Filippos
                        map = create_map(mapper, target, pr->req->targetlen);
1768 126c0407 Giannakos Filippos
                        if (!map){
1769 126c0407 Giannakos Filippos
                                r = -1;
1770 126c0407 Giannakos Filippos
                                goto out;
1771 126c0407 Giannakos Filippos
                        }
1772 efb7bdb0 Giannakos Filippos
                        map->size = xclone->size;
1773 9d3f16da Giannakos Filippos
                        //populate_map with zero objects;
1774 efb7bdb0 Giannakos Filippos
                        uint64_t nr_objs = xclone->size / block_size;
1775 efb7bdb0 Giannakos Filippos
                        if (xclone->size % block_size)
1776 126c0407 Giannakos Filippos
                                nr_objs++;
1777 efb7bdb0 Giannakos Filippos
                                
1778 efb7bdb0 Giannakos Filippos
                        struct map_node *map_nodes = calloc(nr_objs, sizeof(struct map_node));
1779 efb7bdb0 Giannakos Filippos
                        if (!map_nodes){
1780 4fb9bce5 Giannakos Filippos
                                do_dropcache(pr, map); //Since we just created the map, dropping cache should be sufficient.
1781 efb7bdb0 Giannakos Filippos
                                r = -1;
1782 efb7bdb0 Giannakos Filippos
                                goto out;
1783 efb7bdb0 Giannakos Filippos
                        }
1784 126c0407 Giannakos Filippos
                        uint64_t i;
1785 126c0407 Giannakos Filippos
                        for (i = 0; i < nr_objs; i++) {
1786 efb7bdb0 Giannakos Filippos
                                strncpy(map_nodes[i].object, zero_block, strlen(zero_block)); //this should be SHA256_DIGEST_SIZE *2
1787 efb7bdb0 Giannakos Filippos
                                map_nodes[i].objectlen = strlen(zero_block);
1788 efb7bdb0 Giannakos Filippos
                                map_nodes[i].object[map_nodes[i].objectlen] = 0; //NULL terminate
1789 efb7bdb0 Giannakos Filippos
                                map_nodes[i].flags = 0;
1790 efb7bdb0 Giannakos Filippos
                                map_nodes[i].objectidx = i;
1791 efb7bdb0 Giannakos Filippos
                                map_nodes[i].map = map;
1792 efb7bdb0 Giannakos Filippos
                                map_nodes[i].ref = 1;
1793 efb7bdb0 Giannakos Filippos
                                map_nodes[i].waiters = 0;
1794 efb7bdb0 Giannakos Filippos
                                map_nodes[i].cond = st_cond_new(); //FIXME errcheck;
1795 efb7bdb0 Giannakos Filippos
                                r = insert_object(map, &map_nodes[i]);
1796 efb7bdb0 Giannakos Filippos
                                if (r < 0){
1797 efb7bdb0 Giannakos Filippos
                                        do_dropcache(pr, map);
1798 efb7bdb0 Giannakos Filippos
                                        r = -1;
1799 efb7bdb0 Giannakos Filippos
                                        goto out;
1800 efb7bdb0 Giannakos Filippos
                                }
1801 126c0407 Giannakos Filippos
                        }
1802 126c0407 Giannakos Filippos
                        r = write_map(pr, map);
1803 126c0407 Giannakos Filippos
                        if (r < 0){
1804 126c0407 Giannakos Filippos
                                XSEGLOG2(&lc, E, "Cannot write map %s", map->volume);
1805 efb7bdb0 Giannakos Filippos
                                do_dropcache(pr, map);
1806 efb7bdb0 Giannakos Filippos
                                goto out;
1807 126c0407 Giannakos Filippos
                        }
1808 126c0407 Giannakos Filippos
                        XSEGLOG2(&lc, I, "Volume %s created", map->volume);
1809 126c0407 Giannakos Filippos
                        r = 0;
1810 4fb9bce5 Giannakos Filippos
                        do_dropcache(pr, map); //drop cache here for consistency
1811 f60debc1 Giannakos Filippos
                }
1812 c04d4786 Filippos Giannakos
        }
1813 9d3f16da Giannakos Filippos
out:
1814 9d3f16da Giannakos Filippos
        if (r < 0)
1815 9d3f16da Giannakos Filippos
                fail(peer, pr);
1816 9d3f16da Giannakos Filippos
        else
1817 9d3f16da Giannakos Filippos
                complete(peer, pr);
1818 9d3f16da Giannakos Filippos
        ta--;
1819 9d3f16da Giannakos Filippos
        return NULL;
1820 c04d4786 Filippos Giannakos
}
1821 c04d4786 Filippos Giannakos
1822 9d3f16da Giannakos Filippos
void * handle_mapr(struct peer_req *pr)
1823 c04d4786 Filippos Giannakos
{
1824 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1825 9d3f16da Giannakos Filippos
        char *target = xseg_get_target(peer->xseg, pr->req);
1826 9d3f16da Giannakos Filippos
        int r = map_action(do_mapr, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE);
1827 9d3f16da Giannakos Filippos
        if (r < 0)
1828 9d3f16da Giannakos Filippos
                fail(peer, pr);
1829 9d3f16da Giannakos Filippos
        else
1830 9d3f16da Giannakos Filippos
                complete(peer, pr);
1831 9d3f16da Giannakos Filippos
        ta--;
1832 9d3f16da Giannakos Filippos
        return NULL;
1833 c04d4786 Filippos Giannakos
}
1834 c04d4786 Filippos Giannakos
1835 9d3f16da Giannakos Filippos
void * handle_mapw(struct peer_req *pr)
1836 7c130c7d Filippos Giannakos
{
1837 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1838 6af5c5c6 Giannakos Filippos
        char *target = xseg_get_target(peer->xseg, pr->req);
1839 9d3f16da Giannakos Filippos
        int r = map_action(do_mapw, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1840 9d3f16da Giannakos Filippos
        if (r < 0)
1841 c04d4786 Filippos Giannakos
                fail(peer, pr);
1842 9d3f16da Giannakos Filippos
        else
1843 9d3f16da Giannakos Filippos
                complete(peer, pr);
1844 9d3f16da Giannakos Filippos
        XSEGLOG2(&lc, D, "Ta: %d", ta);
1845 9d3f16da Giannakos Filippos
        ta--;
1846 9d3f16da Giannakos Filippos
        return NULL;
1847 7c130c7d Filippos Giannakos
}
1848 7c130c7d Filippos Giannakos
1849 9d3f16da Giannakos Filippos
void * handle_destroy(struct peer_req *pr)
1850 9f27419f Giannakos Filippos
{
1851 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1852 9f27419f Giannakos Filippos
        char *target = xseg_get_target(peer->xseg, pr->req);
1853 9d3f16da Giannakos Filippos
        int r = map_action(do_destroy, pr, target, pr->req->targetlen, MF_LOAD|MF_EXCLUSIVE|MF_FORCE);
1854 9d3f16da Giannakos Filippos
        if (r < 0)
1855 9f27419f Giannakos Filippos
                fail(peer, pr);
1856 9d3f16da Giannakos Filippos
        else
1857 9f27419f Giannakos Filippos
                complete(peer, pr);
1858 9d3f16da Giannakos Filippos
        ta--;
1859 9d3f16da Giannakos Filippos
        return NULL;
1860 9d3f16da Giannakos Filippos
}
1861 9f27419f Giannakos Filippos
1862 9d3f16da Giannakos Filippos
void * handle_close(struct peer_req *pr)
1863 9d3f16da Giannakos Filippos
{
1864 9d3f16da Giannakos Filippos
        struct peerd *peer = pr->peer;
1865 9d3f16da Giannakos Filippos
        char *target = xseg_get_target(peer->xseg, pr->req);
1866 9d3f16da Giannakos Filippos
        //here we do not want to load
1867 9d3f16da Giannakos Filippos
        int r = map_action(do_close, pr, target, pr->req->targetlen, MF_EXCLUSIVE|MF_FORCE);
1868 9d3f16da Giannakos Filippos
        if (r < 0)
1869 9d3f16da Giannakos Filippos
                fail(peer, pr);
1870 9d3f16da Giannakos Filippos
        else
1871 9d3f16da Giannakos Filippos
                complete(peer, pr);
1872 9d3f16da Giannakos Filippos
        ta--;
1873 9d3f16da Giannakos Filippos
        return NULL;
1874 9d3f16da Giannakos Filippos
}
1875 9f27419f Giannakos Filippos
1876 9d3f16da Giannakos Filippos
int dispatch_accepted(struct peerd *peer, struct peer_req *pr, 
1877 9d3f16da Giannakos Filippos
                        struct xseg_request *req)
1878 9d3f16da Giannakos Filippos
{
1879 9d3f16da Giannakos Filippos
        //struct mapperd *mapper = __get_mapperd(peer);
1880 9d3f16da Giannakos Filippos
        struct mapper_io *mio = __get_mapper_io(pr);
1881 9d3f16da Giannakos Filippos
        void *(*action)(struct peer_req *) = NULL;
1882 9f27419f Giannakos Filippos
1883 9d3f16da Giannakos Filippos
        mio->state = ACCEPTED;
1884 9d3f16da Giannakos Filippos
        mio->err = 0;
1885 9d3f16da Giannakos Filippos
        mio->cb = NULL;
1886 9d3f16da Giannakos Filippos
        switch (pr->req->op) {
1887 9d3f16da Giannakos Filippos
                /* primary xseg operations of mapper */
1888 9d3f16da Giannakos Filippos
                case X_CLONE: action = handle_clone; break;
1889 9d3f16da Giannakos Filippos
                case X_MAPR: action = handle_mapr; break;
1890 9d3f16da Giannakos Filippos
                case X_MAPW: action = handle_mapw; break;
1891 9d3f16da Giannakos Filippos
//                case X_SNAPSHOT: handle_snap(peer, pr, req); break;
1892 9d3f16da Giannakos Filippos
                case X_INFO: action = handle_info; break;
1893 9d3f16da Giannakos Filippos
                case X_DELETE: action = handle_destroy; break;
1894 9d3f16da Giannakos Filippos
                case X_CLOSE: action = handle_close; break;
1895 9d3f16da Giannakos Filippos
                default: fprintf(stderr, "mydispatch: unknown up\n"); break;
1896 9d3f16da Giannakos Filippos
        }
1897 9d3f16da Giannakos Filippos
        if (action){
1898 9d3f16da Giannakos Filippos
                ta++;
1899 9d3f16da Giannakos Filippos
                mio->active = 1;
1900 9d3f16da Giannakos Filippos
                st_thread_create(action, pr, 0, 0);
1901 9d3f16da Giannakos Filippos
        }
1902 9f27419f Giannakos Filippos
        return 0;
1903 9d3f16da Giannakos Filippos
1904 9f27419f Giannakos Filippos
}
1905 9f27419f Giannakos Filippos
1906 8fdb763c Giannakos Filippos
int dispatch(struct peerd *peer, struct peer_req *pr, struct xseg_request *req,
1907 8fdb763c Giannakos Filippos
                enum dispatch_reason reason)
1908 7c130c7d Filippos Giannakos
{
1909 7c130c7d Filippos Giannakos
        struct mapperd *mapper = __get_mapperd(peer);
1910 7c130c7d Filippos Giannakos
        (void) mapper;
1911 7c130c7d Filippos Giannakos
        struct mapper_io *mio = __get_mapper_io(pr);
1912 7c130c7d Filippos Giannakos
        (void) mio;
1913 7c130c7d Filippos Giannakos
1914 7c130c7d Filippos Giannakos
1915 9d3f16da Giannakos Filippos
        if (reason == dispatch_accept)
1916 9d3f16da Giannakos Filippos
                dispatch_accepted(peer, pr, req);
1917 9d3f16da Giannakos Filippos
        else {
1918 9d3f16da Giannakos Filippos
                if (mio->cb){
1919 9d3f16da Giannakos Filippos
                        mio->cb(pr, req);
1920 9d3f16da Giannakos Filippos
                } else { 
1921 9d3f16da Giannakos Filippos
                        signal_pr(pr);
1922 9d3f16da Giannakos Filippos
                }
1923 7c130c7d Filippos Giannakos
        }
1924 7c130c7d Filippos Giannakos
        return 0;
1925 7c130c7d Filippos Giannakos
}
1926 7c130c7d Filippos Giannakos
1927 ca18b3c1 Filippos Giannakos
int custom_peer_init(struct peerd *peer, int argc, char *argv[])
1928 7c130c7d Filippos Giannakos
{
1929 7c130c7d Filippos Giannakos
        int i;
1930 5ad213f0 Filippos Giannakos
        unsigned char buf[SHA256_DIGEST_SIZE];
1931 99dad167 Giannakos Filippos
        unsigned char *zero;
1932 6805c197 Giannakos Filippos
1933 6805c197 Giannakos Filippos
        gcry_control (GCRYCTL_SET_THREAD_CBS, &gcry_threads_pthread);
1934 6805c197 Giannakos Filippos
1935 6805c197 Giannakos Filippos
               /* Version check should be the very first call because it
1936 6805c197 Giannakos Filippos
          makes sure that important subsystems are intialized. */
1937 6805c197 Giannakos Filippos
               gcry_check_version (NULL);
1938 6805c197 Giannakos Filippos
     
1939 6805c197 Giannakos Filippos
               /* Disable secure memory.  */
1940 6805c197 Giannakos Filippos
               gcry_control (GCRYCTL_DISABLE_SECMEM, 0);
1941 6805c197 Giannakos Filippos
     
1942 6805c197 Giannakos Filippos
               /* Tell Libgcrypt that initialization has completed. */
1943 6805c197 Giannakos Filippos
               gcry_control (GCRYCTL_INITIALIZATION_FINISHED, 0);
1944 6805c197 Giannakos Filippos
1945 40e56a42 Filippos Giannakos
        /* calculate out magic sha hash value */
1946 6805c197 Giannakos Filippos
        gcry_md_hash_buffer(GCRY_MD_SHA256, magic_sha256, magic_string, strlen(magic_string));
1947 7c130c7d Filippos Giannakos
1948 40e56a42 Filippos Giannakos
        /* calculate zero block */
1949 40e56a42 Filippos Giannakos
        //FIXME check hash value
1950 5ad213f0 Filippos Giannakos
        zero = malloc(block_size);
1951 5ad213f0 Filippos Giannakos
        memset(zero, 0, block_size);
1952 6805c197 Giannakos Filippos
        gcry_md_hash_buffer(GCRY_MD_SHA256, buf, zero, block_size);
1953 7c130c7d Filippos Giannakos
        for (i = 0; i < SHA256_DIGEST_SIZE; ++i)
1954 5ad213f0 Filippos Giannakos
                sprintf(zero_block + 2*i, "%02x", buf[i]);
1955 5ad213f0 Filippos Giannakos
        printf("%s \n", zero_block);
1956 6805c197 Giannakos Filippos
        free(zero);
1957 7c130c7d Filippos Giannakos
1958 40e56a42 Filippos Giannakos
        //FIXME error checks
1959 9d3f16da Giannakos Filippos
        struct mapperd *mapperd = malloc(sizeof(struct mapperd));
1960 9d3f16da Giannakos Filippos
        peer->priv = mapperd;
1961 9d3f16da Giannakos Filippos
        mapper = mapperd;
1962 7c130c7d Filippos Giannakos
        mapper->hashmaps = xhash_new(3, STRING);
1963 7c130c7d Filippos Giannakos
        
1964 7c130c7d Filippos Giannakos
        for (i = 0; i < peer->nr_ops; i++) {
1965 7c130c7d Filippos Giannakos
                struct mapper_io *mio = malloc(sizeof(struct mapper_io));
1966 7c130c7d Filippos Giannakos
                mio->copyups_nodes = xhash_new(3, INTEGER);
1967 7c130c7d Filippos Giannakos
                mio->copyups = 0;
1968 7c130c7d Filippos Giannakos
                mio->err = 0;
1969 9d3f16da Giannakos Filippos
                mio->active = 0;
1970 7c130c7d Filippos Giannakos
                peer->peer_reqs[i].priv = mio;
1971 7c130c7d Filippos Giannakos
        }
1972 7c130c7d Filippos Giannakos
1973 7c130c7d Filippos Giannakos
        for (i = 0; i < argc; i++) {
1974 7c130c7d Filippos Giannakos
                if (!strcmp(argv[i], "-bp") && (i+1) < argc){
1975 7c130c7d Filippos Giannakos
                        mapper->bportno = atoi(argv[i+1]);
1976 7c130c7d Filippos Giannakos
                        i += 1;
1977 7c130c7d Filippos Giannakos
                        continue;
1978 7c130c7d Filippos Giannakos
                }
1979 e155e2f9 Giannakos Filippos
                if (!strcmp(argv[i], "-mbp") && (i+1) < argc){
1980 e155e2f9 Giannakos Filippos
                        mapper->mbportno = atoi(argv[i+1]);
1981 e155e2f9 Giannakos Filippos
                        i += 1;
1982 e155e2f9 Giannakos Filippos
                        continue;
1983 e155e2f9 Giannakos Filippos
                }
1984 40e56a42 Filippos Giannakos
                /* enforce only one thread */
1985 40e56a42 Filippos Giannakos
                if (!strcmp(argv[i], "-t") && (i+1) < argc){
1986 40e56a42 Filippos Giannakos
                        int t = atoi(argv[i+1]);
1987 40e56a42 Filippos Giannakos
                        if (t != 1) {
1988 40e56a42 Filippos Giannakos
                                printf("ERROR: mapperd supports only one thread for the moment\nExiting ...\n");
1989 40e56a42 Filippos Giannakos
                                return -1;
1990 40e56a42 Filippos Giannakos
                        }
1991 40e56a42 Filippos Giannakos
                        i += 1;
1992 40e56a42 Filippos Giannakos
                        continue;
1993 40e56a42 Filippos Giannakos
                }
1994 7c130c7d Filippos Giannakos
        }
1995 7c130c7d Filippos Giannakos
1996 b911a183 Giannakos Filippos
        const struct sched_param param = { .sched_priority = 99 };
1997 b911a183 Giannakos Filippos
        sched_setscheduler(syscall(SYS_gettid), SCHED_FIFO, &param);
1998 b911a183 Giannakos Filippos
1999 b911a183 Giannakos Filippos
2000 65062a91 Filippos Giannakos
//        test_map(peer);
2001 7c130c7d Filippos Giannakos
2002 7c130c7d Filippos Giannakos
        return 0;
2003 7c130c7d Filippos Giannakos
}
2004 5ad213f0 Filippos Giannakos
2005 5ad213f0 Filippos Giannakos
void print_obj(struct map_node *mn)
2006 5ad213f0 Filippos Giannakos
{
2007 ca18b3c1 Filippos Giannakos
        fprintf(stderr, "[%llu]object name: %s[%u] exists: %c\n", 
2008 ca18b3c1 Filippos Giannakos
                        (unsigned long long) mn->objectidx, mn->object, 
2009 ca18b3c1 Filippos Giannakos
                        (unsigned int) mn->objectlen, 
2010 5ad213f0 Filippos Giannakos
                        (mn->flags & MF_OBJECT_EXIST) ? 'y' : 'n');
2011 5ad213f0 Filippos Giannakos
}
2012 5ad213f0 Filippos Giannakos
2013 5ad213f0 Filippos Giannakos
void print_map(struct map *m)
2014 5ad213f0 Filippos Giannakos
{
2015 5ad213f0 Filippos Giannakos
        uint64_t nr_objs = m->size/block_size;
2016 5ad213f0 Filippos Giannakos
        if (m->size % block_size)
2017 5ad213f0 Filippos Giannakos
                nr_objs++;
2018 f4b85f41 Giannakos Filippos
        fprintf(stderr, "Volume name: %s[%u], size: %llu, nr_objs: %llu\n", 
2019 ca18b3c1 Filippos Giannakos
                        m->volume, m->volumelen, 
2020 ca18b3c1 Filippos Giannakos
                        (unsigned long long) m->size, 
2021 ca18b3c1 Filippos Giannakos
                        (unsigned long long) nr_objs);
2022 5ad213f0 Filippos Giannakos
        uint64_t i;
2023 5ad213f0 Filippos Giannakos
        struct map_node *mn;
2024 40e56a42 Filippos Giannakos
        if (nr_objs > 1000000) //FIXME to protect against invalid volume size
2025 5ad213f0 Filippos Giannakos
                return;
2026 5ad213f0 Filippos Giannakos
        for (i = 0; i < nr_objs; i++) {
2027 5ad213f0 Filippos Giannakos
                mn = find_object(m, i);
2028 40e56a42 Filippos Giannakos
                if (!mn){
2029 ca18b3c1 Filippos Giannakos
                        printf("object idx [%llu] not found!\n", (unsigned long long) i);
2030 5ad213f0 Filippos Giannakos
                        continue;
2031 40e56a42 Filippos Giannakos
                }
2032 5ad213f0 Filippos Giannakos
                print_obj(mn);
2033 5ad213f0 Filippos Giannakos
        }
2034 5ad213f0 Filippos Giannakos
}
2035 5ad213f0 Filippos Giannakos
2036 65062a91 Filippos Giannakos
/*
2037 5ad213f0 Filippos Giannakos
void test_map(struct peerd *peer)
2038 5ad213f0 Filippos Giannakos
{
2039 5ad213f0 Filippos Giannakos
        int i,j, ret;
2040 6805c197 Giannakos Filippos
        //struct sha256_ctx sha256ctx;
2041 5ad213f0 Filippos Giannakos
        unsigned char buf[SHA256_DIGEST_SIZE];
2042 65062a91 Filippos Giannakos
        char buf_new[XSEG_MAX_TARGETLEN + 20];
2043 5ad213f0 Filippos Giannakos
        struct map *m = malloc(sizeof(struct map));
2044 65062a91 Filippos Giannakos
        strncpy(m->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN + 1);
2045 65062a91 Filippos Giannakos
        m->volume[XSEG_MAX_TARGETLEN] = 0;
2046 65062a91 Filippos Giannakos
        strncpy(buf_new, m->volume, XSEG_MAX_TARGETLEN);
2047 65062a91 Filippos Giannakos
        buf_new[XSEG_MAX_TARGETLEN + 19] = 0;
2048 65062a91 Filippos Giannakos
        m->volumelen = XSEG_MAX_TARGETLEN;
2049 5ad213f0 Filippos Giannakos
        m->size = 100*block_size;
2050 5ad213f0 Filippos Giannakos
        m->objects = xhash_new(3, INTEGER);
2051 5ad213f0 Filippos Giannakos
        struct map_node *map_node = calloc(100, sizeof(struct map_node));
2052 5ad213f0 Filippos Giannakos
        for (i = 0; i < 100; i++) {
2053 65062a91 Filippos Giannakos
                sprintf(buf_new +XSEG_MAX_TARGETLEN, "%u", i);
2054 6805c197 Giannakos Filippos
                gcry_md_hash_buffer(GCRY_MD_SHA256, buf, buf_new, strlen(buf_new));
2055 6805c197 Giannakos Filippos
                
2056 5ad213f0 Filippos Giannakos
                for (j = 0; j < SHA256_DIGEST_SIZE; j++) {
2057 5ad213f0 Filippos Giannakos
                        sprintf(map_node[i].object + 2*j, "%02x", buf[j]);
2058 5ad213f0 Filippos Giannakos
                }
2059 5ad213f0 Filippos Giannakos
                map_node[i].objectidx = i;
2060 65062a91 Filippos Giannakos
                map_node[i].objectlen = XSEG_MAX_TARGETLEN;
2061 5ad213f0 Filippos Giannakos
                map_node[i].flags = MF_OBJECT_EXIST;
2062 5ad213f0 Filippos Giannakos
                ret = insert_object(m, &map_node[i]);
2063 5ad213f0 Filippos Giannakos
        }
2064 5ad213f0 Filippos Giannakos

2065 5ad213f0 Filippos Giannakos
        char *data = malloc(block_size);
2066 5ad213f0 Filippos Giannakos
        mapheader_to_map(m, data);
2067 5ad213f0 Filippos Giannakos
        uint64_t pos = mapheader_size;
2068 5ad213f0 Filippos Giannakos

2069 5ad213f0 Filippos Giannakos
        for (i = 0; i < 100; i++) {
2070 5ad213f0 Filippos Giannakos
                map_node = find_object(m, i);
2071 5ad213f0 Filippos Giannakos
                if (!map_node){
2072 5ad213f0 Filippos Giannakos
                        printf("no object node %d \n", i);
2073 5ad213f0 Filippos Giannakos
                        exit(1);
2074 5ad213f0 Filippos Giannakos
                }
2075 5ad213f0 Filippos Giannakos
                object_to_map(data+pos, map_node);
2076 5ad213f0 Filippos Giannakos
                pos += objectsize_in_map;
2077 5ad213f0 Filippos Giannakos
        }
2078 5ad213f0 Filippos Giannakos
//        print_map(m);
2079 5ad213f0 Filippos Giannakos

2080 5ad213f0 Filippos Giannakos
        struct map *m2 = malloc(sizeof(struct map));
2081 65062a91 Filippos Giannakos
        strncpy(m2->volume, "012345678901234567890123456789ab012345678901234567890123456789ab", XSEG_MAX_TARGETLEN +1);
2082 65062a91 Filippos Giannakos
        m->volume[XSEG_MAX_TARGETLEN] = 0;
2083 65062a91 Filippos Giannakos
        m->volumelen = XSEG_MAX_TARGETLEN;
2084 5ad213f0 Filippos Giannakos

2085 5ad213f0 Filippos Giannakos
        m2->objects = xhash_new(3, INTEGER);
2086 5ad213f0 Filippos Giannakos
        ret = read_map(peer, m2, data);
2087 5ad213f0 Filippos Giannakos
//        print_map(m2);
2088 5ad213f0 Filippos Giannakos

2089 6805c197 Giannakos Filippos
        int fd = open(m->volume, O_CREAT|O_WRONLY, S_IRUSR|S_IWUSR|S_IRGRP|S_IWGRP|S_IROTH);
2090 5ad213f0 Filippos Giannakos
        ssize_t r, sum = 0;
2091 5ad213f0 Filippos Giannakos
        while (sum < block_size) {
2092 5ad213f0 Filippos Giannakos
                r = write(fd, data + sum, block_size -sum);
2093 5ad213f0 Filippos Giannakos
                if (r < 0){
2094 5ad213f0 Filippos Giannakos
                        perror("write");
2095 5ad213f0 Filippos Giannakos
                        printf("write error\n");
2096 5ad213f0 Filippos Giannakos
                        exit(1);
2097 5ad213f0 Filippos Giannakos
                } 
2098 5ad213f0 Filippos Giannakos
                sum += r;
2099 5ad213f0 Filippos Giannakos
        }
2100 5ad213f0 Filippos Giannakos
        close(fd);
2101 5ad213f0 Filippos Giannakos
        map_node = find_object(m, 0);
2102 5ad213f0 Filippos Giannakos
        free(map_node);
2103 5ad213f0 Filippos Giannakos
        free(m);
2104 5ad213f0 Filippos Giannakos
}
2105 65062a91 Filippos Giannakos
*/