Bump version to 0.3.5next
[archipelago] / xseg / peers / user / mapper-version2.c
1 /*
2  * Copyright 2013 GRNET S.A. All rights reserved.
3  *
4  * Redistribution and use in source and binary forms, with or
5  * without modification, are permitted provided that the following
6  * conditions are met:
7  *
8  *   1. Redistributions of source code must retain the above
9  *      copyright notice, this list of conditions and the following
10  *      disclaimer.
11  *   2. Redistributions in binary form must reproduce the above
12  *      copyright notice, this list of conditions and the following
13  *      disclaimer in the documentation and/or other materials
14  *      provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17  * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26  * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27  * POSSIBILITY OF SUCH DAMAGE.
28  *
29  * The views and conclusions contained in the software and
30  * documentation are those of the authors and should not be
31  * interpreted as representing official policies, either expressed
32  * or implied, of GRNET S.A.
33  */
34
35 #include <mapper.h>
36 #include <mapper-version2.h>
37 #include <xseg/xseg.h>
38
39 /* v2 functions */
40
41 static uint32_t get_map_block_name(char *target, struct map *map, uint64_t block_id)
42 {
43         uint32_t targetlen;
44         char buf[sizeof(block_id)*2 + 1];
45         hexlify((unsigned char *)&block_id, sizeof(block_id), buf);
46         buf[2*sizeof(block_id)] = 0;
47         sprintf(target, "%s_%s", map->volume, buf);
48         targetlen = map->volumelen + 1 + (sizeof(block_id) * 2);
49
50         return targetlen;
51 }
52
53 struct obj2chunk {
54         uint64_t start_obj;
55         uint64_t nr_objs;
56         char target[XSEG_MAX_TARGETLEN + 1];
57         uint32_t targetlen;
58         uint32_t offset;
59         uint32_t len;
60 };
61
62 static struct obj2chunk get_chunk(struct map *map, uint64_t start, uint64_t nr)
63 {
64         struct obj2chunk ret;
65         uint64_t nr_objs_per_block, nr_objs_per_chunk, nr_chunks_per_block;
66         uint64_t start_map_block, start_chunk_in_map_block, start_obj_in_chunk;
67
68         nr_objs_per_chunk = v2_read_chunk_size/v2_objectsize_in_map;
69         nr_chunks_per_block = map->blocksize/v2_read_chunk_size;
70         nr_objs_per_block = nr_chunks_per_block * nr_objs_per_chunk;
71
72         start_map_block = start / nr_objs_per_block;
73         start_chunk_in_map_block = (start % nr_objs_per_block)/nr_objs_per_chunk;
74         start_obj_in_chunk = (start - start_map_block * nr_objs_per_block - start_chunk_in_map_block * nr_objs_per_chunk);
75
76         ret.targetlen = get_map_block_name(ret.target, map, start_map_block);
77
78         ret.start_obj = start;
79         if (start_obj_in_chunk + nr > nr_objs_per_chunk)
80                 ret.nr_objs = nr_objs_per_chunk - start_obj_in_chunk;
81         else
82                 ret.nr_objs = nr;
83
84         ret.offset = start_chunk_in_map_block * v2_read_chunk_size;
85         ret.offset += start_obj_in_chunk * v2_objectsize_in_map;
86         ret.len = ret.nr_objs * v2_objectsize_in_map;
87
88         XSEGLOG2(&lc, D, "For map %s, start: %llu, nr: %llu calculated: "
89                         "target: %s (%u), start_obj: %llu, nr_objs: %llu, "
90                         "offset: %u, len: %u", map->volume, start, nr,
91                         ret.target, ret.targetlen, ret.start_obj, ret.nr_objs,
92                         ret.offset, ret.len);
93
94         return ret;
95 }
96
97 int read_object_v2(struct map_node *mn, unsigned char *buf)
98 {
99         char c = buf[0];
100         int len = 0;
101         uint32_t objectlen;
102
103         mn->flags = 0;
104         mn->flags |= MF_OBJECT_WRITABLE & c;
105         mn->flags |= MF_OBJECT_ARCHIP & c;
106         mn->flags |= MF_OBJECT_ZERO & c;
107         objectlen = *(typeof(objectlen) *)(buf + 1);
108         mn->objectlen = objectlen;
109         if (mn->objectlen > v2_max_objectlen) {
110                 XSEGLOG2(&lc, D, "mn: %p, buf: %p, objectlen: %u", mn, buf, mn->objectlen);
111                 XSEGLOG2(&lc, E, "Invalid object len %u", mn->objectlen);
112                 return -1;
113         }
114
115         if (mn->flags & MF_OBJECT_ARCHIP){
116                 strcpy(mn->object, MAPPER_PREFIX);
117                 len += MAPPER_PREFIX_LEN;
118         }
119         memcpy(mn->object + len, buf + sizeof(objectlen) + 1, mn->objectlen);
120         mn->object[mn->objectlen] = 0;
121
122         return 0;
123 }
124
125 void object_to_map_v2(unsigned char* buf, struct map_node *mn)
126 {
127         uint32_t *objectlen;
128         uint32_t len;
129         buf[0] = 0;
130         buf[0] |= mn->flags & MF_OBJECT_WRITABLE;
131         buf[0] |= mn->flags & MF_OBJECT_ARCHIP;
132         buf[0] |= mn->flags & MF_OBJECT_ZERO;
133
134         if (!__builtin_types_compatible_p(typeof(mn->objectlen), typeof(*objectlen))) {
135                 XSEGLOG2(&lc, W, "Mapnode objectlen incompatible with map "
136                                  "objectlen buffer");
137         }
138
139         objectlen = (typeof(objectlen))(buf + 1);
140         *objectlen = mn->objectlen & 0xFFFFFFFF;
141         if (*objectlen > v2_max_objectlen) {
142                 XSEGLOG2(&lc, E, "Invalid object len %u", mn->objectlen);
143         }
144
145         len = 0;
146         if (mn->flags & MF_OBJECT_ARCHIP){
147                 /* strip common prefix */
148                 len += MAPPER_PREFIX_LEN;
149         }
150         memcpy((buf + 1 + sizeof(uint32_t)), mn->object + len, mn->objectlen);
151 }
152
153 struct xseg_request * prepare_write_objects_o2c_v2(struct peer_req *pr, struct map *map,
154                                 struct obj2chunk o2c)
155 {
156         struct xseg_request *req;
157         uint64_t limit, k, pos, datalen;
158         struct peerd *peer = pr->peer;
159         struct mapperd *mapper = __get_mapperd(peer);
160         char *data;
161         struct map_node *mn;
162
163         datalen = v2_read_chunk_size;
164
165         XSEGLOG2(&lc, D, "Starting for map %s, start_obj: %llu, nr_objs: %llu",
166                         map->volume, o2c.start_obj, o2c.nr_objs);
167
168         req = get_request(pr, mapper->mbportno, o2c.target,
169                         o2c.targetlen, datalen);
170         if (!req) {
171                 XSEGLOG2(&lc, E, "Cannot get request");
172                 return NULL;
173         }
174
175         req->op = X_WRITE;
176         req->offset = o2c.offset;
177         req->size = o2c.len;
178
179         data = xseg_get_data(peer->xseg, req);
180         limit = o2c.start_obj + o2c.nr_objs;
181         pos = 0;
182         for (k = o2c.start_obj; k < limit; k++) {
183                 mn = &map->objects[k];
184                 object_to_map_v2((unsigned char *)(data+pos), mn);
185                 pos += v2_objectsize_in_map;
186         }
187
188         return req;
189
190 }
191
192 struct xseg_request * prepare_write_objects_v2(struct peer_req *pr, struct map *map,
193                                 uint64_t start, uint64_t nr)
194 {
195         struct obj2chunk o2c;
196         o2c = get_chunk(map, start, nr);
197         if (o2c.nr_objs != nr) {
198                 return NULL;
199         }
200         return prepare_write_objects_o2c_v2(pr, map, o2c);
201 }
202
203 struct xseg_request * prepare_write_object_v2(struct peer_req *pr, struct map *map,
204                                 struct map_node *mn)
205 {
206         struct peerd *peer = pr->peer;
207         char *data;
208         struct xseg_request *req;
209
210         req = prepare_write_objects_v2(pr, map, mn->objectidx, 1);
211         if (!req)
212                 return NULL;
213
214         data = xseg_get_data(peer->xseg, req);
215         object_to_map_v2((unsigned char *)(data), mn);
216         return req;
217 }
218
219
220 int read_map_objects_v2(struct map *map, unsigned char *data, uint64_t start, uint64_t nr)
221 {
222         int r;
223         struct map_node *map_node;
224         uint64_t i;
225         uint64_t pos = 0;
226         char nulls[SHA256_DIGEST_SIZE];
227         memset(nulls, 0, SHA256_DIGEST_SIZE);
228
229         r = !memcmp(data, nulls, SHA256_DIGEST_SIZE);
230         if (r) {
231                 XSEGLOG2(&lc, E, "Data are zeros");
232                 return -1;
233         }
234
235         if (start + nr > map->nr_objs) {
236                 return -1;
237         }
238
239         if (!map->objects) {
240                 XSEGLOG2(&lc, D, "Allocating %llu nr_objs for size %llu",
241                                 map->nr_objs, map->size);
242                 map_node = calloc(map->nr_objs, sizeof(struct map_node));
243                 if (!map_node) {
244                         XSEGLOG2(&lc, E, "Cannot allocate mem for %llu objects",
245                                         map->nr_objs);
246                         return -1;
247                 }
248                 map->objects = map_node;
249                 r = initialize_map_objects(map);
250                 if (r < 0) {
251                         XSEGLOG2(&lc, E, "Cannot initialize map objects for map %s",
252                                         map->volume);
253                         goto out_free;
254                 }
255         }
256
257         map_node = map->objects;
258
259         for (i = start; i < nr; i++) {
260                 r = read_object_v2(&map_node[i], data+pos);
261                 if (r < 0) {
262                         XSEGLOG2(&lc, E, "Map %s: Could not read object %llu",
263                                         map->volume, i);
264                         goto out_free;
265                 }
266                 pos += v2_objectsize_in_map;
267         }
268         return 0;
269
270 out_free:
271         free(map->objects);
272         map->objects = NULL;
273         return -1;
274 }
275
276 int read_map_v2(struct map *m, unsigned char *data)
277 {
278         /* totally unsafe */
279         return read_map_objects_v2(m, data, 0, m->nr_objs);
280 }
281
282 void write_map_data_v2_cb(struct peer_req *pr, struct xseg_request *req)
283 {
284         struct mapper_io *mio = __get_mapper_io(pr);
285
286         if (req->state & XS_FAILED) {
287                 mio->err = 1;
288                 XSEGLOG2(&lc, E, "Request failed");
289                 goto out;
290         }
291
292         if (req->serviced != req->size) {
293                 mio->err = 1;
294                 XSEGLOG2(&lc, E, "Serviced != size");
295                 goto out;
296         }
297
298 out:
299         put_request(pr, req);
300         mio->pending_reqs--;
301         signal_pr(pr);
302         return;
303 }
304
305 int __write_map_data_v2(struct peer_req *pr, struct map *map)
306 {
307         int r;
308         struct peerd *peer = pr->peer;
309         struct mapperd *mapper = __get_mapperd(peer);
310         struct mapper_io *mio = __get_mapper_io(pr);
311         uint64_t datalen;
312         struct xseg_request *req;
313         char target[XSEG_MAX_TARGETLEN];
314         uint32_t targetlen;
315         uint64_t nr_objs_per_block, nr_objs_per_chunk, nr_chunks_per_block;
316         uint64_t nr_map_blocks, i, j;
317         uint64_t k, start, limit, pos, count;
318         char buf[sizeof(i)*2 + 1];
319         char *data;
320         struct map_node *mn;
321
322         datalen = v2_read_chunk_size;
323
324         count = 0;
325         nr_objs_per_chunk = v2_read_chunk_size/v2_objectsize_in_map;
326         nr_chunks_per_block = map->blocksize/v2_read_chunk_size;
327 //      nr_objs_per_block = map->blocksize / v2_objectsize_in_map;
328         nr_objs_per_block = nr_chunks_per_block * nr_objs_per_chunk;
329         nr_map_blocks = map->nr_objs / nr_objs_per_block;
330         if (map->nr_objs % nr_objs_per_block) {
331                 nr_map_blocks++;
332         }
333
334         XSEGLOG2(&lc, D, "nr_objs_per_chunk: %llu, nr_chunks_per_block: %llu, "
335                         "nr_objs_per_block: %llu, nr_map_blocks: %llu",
336                         nr_objs_per_chunk, nr_chunks_per_block, nr_objs_per_block,
337                         nr_map_blocks);
338         for (i = 0; i < nr_map_blocks && count < map->nr_objs; i++) {
339                 for (j = 0; j < nr_chunks_per_block && count < map->nr_objs; j++) {
340                         hexlify((unsigned char *)&i, sizeof(i), buf);
341                         buf[2*sizeof(i)] = 0;
342                         sprintf(target, "%s_%s", map->volume, buf);
343                         targetlen = map->volumelen + 1 + (sizeof(i) << 1);
344
345                         req = get_request(pr, mapper->mbportno, target,
346                                         targetlen, datalen);
347                         if (!req) {
348                                 XSEGLOG2(&lc, E, "Cannot get request");
349                                 goto out_err;
350                         }
351                         req->op = X_WRITE;
352                         req->offset = j * v2_read_chunk_size;
353                         req->size = v2_read_chunk_size;
354                         data = xseg_get_data(peer->xseg, req);
355                         start = i * nr_objs_per_block + j * nr_objs_per_chunk;
356                         limit = start + nr_objs_per_chunk;
357                         pos = 0;
358                         for (k = start; k < map->nr_objs && k < limit; k++) {
359                                 mn = &map->objects[k];
360                                 object_to_map_v2((unsigned char *)(data+pos), mn);
361                                 pos += v2_objectsize_in_map;
362                         }
363
364                         XSEGLOG2(&lc, D, "Writing chunk %s(%u) , offset :%llu",
365                                         target, targetlen, req->offset);
366
367                         r = send_request(pr, req);
368                         if (r < 0) {
369                                 XSEGLOG2(&lc, E, "Cannot send request");
370                                 goto out_put;
371                         }
372                         mio->pending_reqs++;
373                         count += nr_objs_per_chunk;
374                 }
375         }
376         return 0;
377
378 out_put:
379         put_request(pr, req);
380 out_err:
381         mio->err = 1;
382         return -1;
383 }
384
385 int __write_objects_v2(struct peer_req *pr, struct map *map, uint64_t start, uint64_t nr)
386 {
387         int r;
388         struct mapper_io *mio = __get_mapper_io(pr);
389         struct xseg_request *req;
390         struct obj2chunk o2c;
391
392         XSEGLOG2(&lc, D, "Writing objects for %s: start: %llu, nr: %llu",
393                         map->volume, start, nr);
394         if (start + nr > map->nr_objs) {
395                 XSEGLOG2(&lc, E, "Attempting to write beyond nr_objs");
396                 return -1;
397         }
398
399         while (nr > 0) {
400                 o2c = get_chunk(map, start, nr);
401
402                 req = prepare_write_objects_o2c_v2(pr, map, o2c);
403
404                 XSEGLOG2(&lc, D, "Writing chunk %s(%u) , offset :%llu",
405                                 o2c.target, o2c.targetlen, req->offset);
406
407
408                 r = send_request(pr, req);
409                 if (r < 0) {
410                         XSEGLOG2(&lc, E, "Cannot send request");
411                         put_request(pr, req);
412                         mio->err = 1;
413                         return -1;
414                 }
415                 mio->pending_reqs++;
416                 nr -= o2c.nr_objs;
417                 start += o2c.nr_objs;
418         }
419         return 0;
420 }
421
422 int write_objects_v2(struct peer_req *pr, struct map *map, uint64_t start, uint64_t nr)
423 {
424         int r;
425         //unsigned char *buf;
426         struct mapper_io *mio = __get_mapper_io(pr);
427         mio->cb = write_map_data_v2_cb;
428
429         r = __write_objects_v2(pr, map, start, nr);
430         if (r < 0)
431                 mio->err = 1;
432
433         if (mio->pending_reqs > 0)
434                 wait_on_pr(pr, mio->pending_reqs > 0);
435
436         mio->priv = NULL;
437         mio->cb = NULL;
438         return (mio->err ? -1 : 0);
439 }
440
441 int write_map_data_v2(struct peer_req *pr, struct map *map)
442 {
443         return write_objects_v2(pr, map, 0, map->nr_objs);
444 }
445
446 void load_map_data_v2_cb(struct peer_req *pr, struct xseg_request *req)
447 {
448         char *data;
449         unsigned char *buf;
450         struct mapper_io *mio = __get_mapper_io(pr);
451         struct peerd *peer = pr->peer;
452         buf = (unsigned char *)__get_node(mio, req);
453
454         XSEGLOG2(&lc, I, "Callback of req %p, buf: %p", req, buf);
455
456         //buf = (unsigned char *)mio->priv;
457         if (!buf) {
458                 XSEGLOG2(&lc, E, "Cannot get load buffer");
459                 mio->err = 1;
460                 goto out;
461         }
462
463         if (req->state & XS_FAILED) {
464                 mio->err = 1;
465                 XSEGLOG2(&lc, E, "Request failed");
466                 goto out;
467         }
468
469         if (req->serviced != req->size) {
470                 mio->err = 1;
471                 XSEGLOG2(&lc, E, "Serviced != size");
472                 goto out;
473         }
474
475         data = xseg_get_data(peer->xseg, req);
476         XSEGLOG2(&lc, D, "Memcpy %llu to %p (%u)", req->serviced, buf, *(uint32_t *)(data+1));
477         memcpy(buf, data, req->serviced);
478
479 out:
480         __set_node(mio, req, NULL);
481         put_request(pr, req);
482         mio->pending_reqs--;
483         signal_pr(pr);
484         return;
485 }
486
487 int __load_map_objects_v2(struct peer_req *pr, struct map *map, uint64_t start, uint64_t nr, unsigned char *buf)
488 {
489         int r;
490         struct peerd *peer = pr->peer;
491         struct mapperd *mapper = __get_mapperd(peer);
492         struct mapper_io *mio = __get_mapper_io(pr);
493         uint64_t datalen;
494         struct xseg_request *req;
495         struct obj2chunk o2c;
496
497         datalen = v2_read_chunk_size;
498
499         if (start + nr > map->nr_objs) {
500                 XSEGLOG2(&lc, E, "Attempting to load beyond nr_objs");
501                 return -1;
502         }
503
504         while (nr > 0) {
505                 o2c = get_chunk(map, start, nr);
506
507                 req = get_request(pr, mapper->mbportno, o2c.target,
508                                 o2c.targetlen, datalen);
509                 if (!req) {
510                         XSEGLOG2(&lc, E, "Cannot get request");
511                         goto out_err;
512                 }
513                 req->op = X_READ;
514                 req->offset = o2c.offset;
515                 req->size = o2c.len;
516
517                 XSEGLOG2(&lc, D, "Reading chunk %s(%u) , offset :%llu",
518                                 o2c.target, o2c.targetlen, req->offset);
519
520                 r = __set_node(mio, req, (struct map_node *)(buf + o2c.start_obj * v2_objectsize_in_map));
521
522                 r = send_request(pr, req);
523                 if (r < 0) {
524                         XSEGLOG2(&lc, E, "Cannot send request");
525                         goto out_put;
526                 }
527                 mio->pending_reqs++;
528                 nr -= o2c.nr_objs;
529                 start += o2c.nr_objs;
530         }
531         return 0;
532
533 out_put:
534         put_request(pr, req);
535 out_err:
536         mio->err = 1;
537         return -1;
538 }
539
540 int load_map_objects_v2(struct peer_req *pr, struct map *map, uint64_t start, uint64_t nr)
541 {
542         int r;
543         unsigned char *buf;
544         struct mapper_io *mio = __get_mapper_io(pr);
545         uint32_t buf_size = sizeof(unsigned char) * nr * v2_objectsize_in_map;
546         uint32_t rem;
547
548         if (map->flags & MF_MAP_DELETED) {
549                 XSEGLOG2(&lc, I, "Map deleted. Ignoring loading objects");
550                 return 0;
551         }
552
553         if (buf_size < v2_read_chunk_size) {
554                 buf_size = v2_read_chunk_size;
555         }
556         /* buf size must be a multiple of v2_read_chunk_size */
557         rem = buf_size % v2_read_chunk_size;
558         XSEGLOG2(&lc, D, "Buf size %u, rem: %u", buf_size, rem);
559         if (rem)
560                 buf_size += (v2_read_chunk_size - rem);
561         XSEGLOG2(&lc, D, "Allocating %u bytes buffer", buf_size);
562         buf = malloc(buf_size);
563         if (!buf) {
564                 XSEGLOG2(&lc, E, "Cannot allocate memory");
565                 return -1;
566         }
567
568         mio->priv = buf;
569         mio->cb = load_map_data_v2_cb;
570
571         r = __load_map_objects_v2(pr, map, start, nr, buf);
572         if (r < 0)
573                 mio->err = 1;
574
575         if (mio->pending_reqs > 0)
576                 wait_on_pr(pr, mio->pending_reqs > 0);
577
578         if (mio->err) {
579                 XSEGLOG2(&lc, E, "Error issuing load request");
580                 goto out;
581         }
582         r = read_map_objects_v2(map, buf, start, nr);
583         if (r < 0) {
584                 mio->err = 1;
585         }
586 out:
587         free(buf);
588         mio->priv = NULL;
589         mio->cb = NULL;
590         return (mio->err ? -1 : 0);
591 }
592
593 int load_map_data_v2(struct peer_req *pr, struct map *map)
594 {
595         return load_map_objects_v2(pr, map, 0, map->nr_objs);
596 }
597
598 int read_map_metadata_v2(struct map *map, unsigned char *metadata,
599                 uint32_t metadata_len)
600 {
601         int r;
602         char nulls[v2_mapheader_size];
603         uint64_t pos;
604         if (metadata_len < v2_mapheader_size) {
605                 XSEGLOG2(&lc, E, "Metadata len < v2_mapheader_size");
606                 return -1;
607         }
608         memset(nulls, 0, v2_mapheader_size);
609         r = !memcmp(metadata, nulls, v2_mapheader_size);
610         if (r) {
611                 XSEGLOG2(&lc, E, "Read zeros");
612                 return -1;
613         }
614
615         pos = 0;
616         /* read header */
617         map->version = *(uint32_t *)(metadata + pos);
618         pos += sizeof(uint32_t);
619         map->size = *(uint64_t *)(metadata + pos);
620         pos += sizeof(uint64_t);
621         map->blocksize = *(uint32_t *)(metadata + pos);
622         pos += sizeof(uint32_t);
623         //FIXME check each flag seperately
624         map->flags = *(uint32_t *)(metadata + pos);
625         pos += sizeof(uint32_t);
626         map->epoch = *(uint64_t *)(metadata + pos);
627         pos += sizeof(uint64_t);
628         /* sanitize flags */
629         //map->flags &= MF_MAP_SANITIZE;
630
631         map->nr_objs = calc_map_obj(map);
632
633         return 0;
634 }
635
636 struct xseg_request * __write_map_metadata_v2(struct peer_req *pr, struct map *map)
637 {
638         struct peerd *peer = pr->peer;
639         struct mapperd *mapper = __get_mapperd(peer);
640         uint64_t datalen;
641         struct xseg_request *req;
642         char *data;
643         uint64_t pos;
644         int r;
645
646         datalen = v2_mapheader_size;
647
648         req = get_request(pr, mapper->mbportno, map->volume, map->volumelen,
649                         datalen);
650         if (!req){
651                 XSEGLOG2(&lc, E, "Cannot get request for map %s",
652                                 map->volume);
653                 goto out_err;
654         }
655
656
657         data = xseg_get_data(peer->xseg, req);
658         pos = 0;
659         memcpy(data + pos, &map->version, sizeof(map->version));
660         pos += sizeof(map->version);
661         memcpy(data + pos, &map->size, sizeof(map->size));
662         pos += sizeof(map->size);
663         memcpy(data + pos, &map->blocksize, sizeof(map->blocksize));
664         pos += sizeof(map->blocksize);
665         //FIXME check each flag seperately
666         memcpy(data + pos, &map->flags, sizeof(map->flags));
667         pos += sizeof(map->flags);
668         memcpy(data + pos, &map->epoch, sizeof(map->epoch));
669         pos += sizeof(map->epoch);
670
671         req->op = X_WRITE;
672         req->size = datalen;
673         req->offset = 0;
674
675         r = send_request(pr, req);
676         if (r < 0) {
677                 XSEGLOG2(&lc, E, "Cannot send request %p, pr: %p, map: %s",
678                                 req, pr, map->volume);
679                 goto out_put;
680         }
681         return req;
682
683 out_put:
684         put_request(pr, req);
685 out_err:
686         return NULL;
687 }
688
689 int write_map_metadata_v2(struct peer_req *pr, struct map *map)
690 {
691         struct xseg_request *req;
692
693         req = __write_map_metadata_v2(pr, map);
694         if (!req)
695                 return -1;
696         wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
697
698         if (req->state & XS_FAILED){
699                 XSEGLOG2(&lc, E, "Write map metadata failed for map %s", map->volume);
700                 put_request(pr, req);
701                 return -1;
702         }
703         put_request(pr, req);
704         return 0;
705 }
706