2 * Copyright 2013 GRNET S.A. All rights reserved.
4 * Redistribution and use in source and binary forms, with or
5 * without modification, are permitted provided that the following
8 * 1. Redistributions of source code must retain the above
9 * copyright notice, this list of conditions and the following
11 * 2. Redistributions in binary form must reproduce the above
12 * copyright notice, this list of conditions and the following
13 * disclaimer in the documentation and/or other materials
14 * provided with the distribution.
16 * THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 * OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 * ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
29 * The views and conclusions contained in the software and
30 * documentation are those of the authors and should not be
31 * interpreted as representing official policies, either expressed
32 * or implied, of GRNET S.A.
36 #include <mapper-version2.h>
37 #include <xseg/xseg.h>
41 static uint32_t get_map_block_name(char *target, struct map *map, uint64_t block_id)
44 char buf[sizeof(block_id)*2 + 1];
45 hexlify((unsigned char *)&block_id, sizeof(block_id), buf);
46 buf[2*sizeof(block_id)] = 0;
47 sprintf(target, "%s_%s", map->volume, buf);
48 targetlen = map->volumelen + 1 + (sizeof(block_id) * 2);
56 char target[XSEG_MAX_TARGETLEN + 1];
62 static struct obj2chunk get_chunk(struct map *map, uint64_t start, uint64_t nr)
65 uint64_t nr_objs_per_block, nr_objs_per_chunk, nr_chunks_per_block;
66 uint64_t start_map_block, start_chunk_in_map_block, start_obj_in_chunk;
68 nr_objs_per_chunk = v2_read_chunk_size/v2_objectsize_in_map;
69 nr_chunks_per_block = map->blocksize/v2_read_chunk_size;
70 nr_objs_per_block = nr_chunks_per_block * nr_objs_per_chunk;
72 start_map_block = start / nr_objs_per_block;
73 start_chunk_in_map_block = (start % nr_objs_per_block)/nr_objs_per_chunk;
74 start_obj_in_chunk = (start - start_map_block * nr_objs_per_block - start_chunk_in_map_block * nr_objs_per_chunk);
76 ret.targetlen = get_map_block_name(ret.target, map, start_map_block);
78 ret.start_obj = start;
79 if (start_obj_in_chunk + nr > nr_objs_per_chunk)
80 ret.nr_objs = nr_objs_per_chunk - start_obj_in_chunk;
84 ret.offset = start_chunk_in_map_block * v2_read_chunk_size;
85 ret.offset += start_obj_in_chunk * v2_objectsize_in_map;
86 ret.len = ret.nr_objs * v2_objectsize_in_map;
88 XSEGLOG2(&lc, D, "For map %s, start: %llu, nr: %llu calculated: "
89 "target: %s (%u), start_obj: %llu, nr_objs: %llu, "
90 "offset: %u, len: %u", map->volume, start, nr,
91 ret.target, ret.targetlen, ret.start_obj, ret.nr_objs,
97 int read_object_v2(struct map_node *mn, unsigned char *buf)
104 mn->flags |= MF_OBJECT_WRITABLE & c;
105 mn->flags |= MF_OBJECT_ARCHIP & c;
106 mn->flags |= MF_OBJECT_ZERO & c;
107 objectlen = *(typeof(objectlen) *)(buf + 1);
108 mn->objectlen = objectlen;
109 if (mn->objectlen > v2_max_objectlen) {
110 XSEGLOG2(&lc, D, "mn: %p, buf: %p, objectlen: %u", mn, buf, mn->objectlen);
111 XSEGLOG2(&lc, E, "Invalid object len %u", mn->objectlen);
115 if (mn->flags & MF_OBJECT_ARCHIP){
116 strcpy(mn->object, MAPPER_PREFIX);
117 len += MAPPER_PREFIX_LEN;
119 memcpy(mn->object + len, buf + sizeof(objectlen) + 1, mn->objectlen);
120 mn->object[mn->objectlen] = 0;
125 void object_to_map_v2(unsigned char* buf, struct map_node *mn)
130 buf[0] |= mn->flags & MF_OBJECT_WRITABLE;
131 buf[0] |= mn->flags & MF_OBJECT_ARCHIP;
132 buf[0] |= mn->flags & MF_OBJECT_ZERO;
134 if (!__builtin_types_compatible_p(typeof(mn->objectlen), typeof(*objectlen))) {
135 XSEGLOG2(&lc, W, "Mapnode objectlen incompatible with map "
139 objectlen = (typeof(objectlen))(buf + 1);
140 *objectlen = mn->objectlen & 0xFFFFFFFF;
141 if (*objectlen > v2_max_objectlen) {
142 XSEGLOG2(&lc, E, "Invalid object len %u", mn->objectlen);
146 if (mn->flags & MF_OBJECT_ARCHIP){
147 /* strip common prefix */
148 len += MAPPER_PREFIX_LEN;
150 memcpy((buf + 1 + sizeof(uint32_t)), mn->object + len, mn->objectlen);
153 struct xseg_request * prepare_write_objects_o2c_v2(struct peer_req *pr, struct map *map,
154 struct obj2chunk o2c)
156 struct xseg_request *req;
157 uint64_t limit, k, pos, datalen;
158 struct peerd *peer = pr->peer;
159 struct mapperd *mapper = __get_mapperd(peer);
163 datalen = v2_read_chunk_size;
165 XSEGLOG2(&lc, D, "Starting for map %s, start_obj: %llu, nr_objs: %llu",
166 map->volume, o2c.start_obj, o2c.nr_objs);
168 req = get_request(pr, mapper->mbportno, o2c.target,
169 o2c.targetlen, datalen);
171 XSEGLOG2(&lc, E, "Cannot get request");
176 req->offset = o2c.offset;
179 data = xseg_get_data(peer->xseg, req);
180 limit = o2c.start_obj + o2c.nr_objs;
182 for (k = o2c.start_obj; k < limit; k++) {
183 mn = &map->objects[k];
184 object_to_map_v2((unsigned char *)(data+pos), mn);
185 pos += v2_objectsize_in_map;
192 struct xseg_request * prepare_write_objects_v2(struct peer_req *pr, struct map *map,
193 uint64_t start, uint64_t nr)
195 struct obj2chunk o2c;
196 o2c = get_chunk(map, start, nr);
197 if (o2c.nr_objs != nr) {
200 return prepare_write_objects_o2c_v2(pr, map, o2c);
203 struct xseg_request * prepare_write_object_v2(struct peer_req *pr, struct map *map,
206 struct peerd *peer = pr->peer;
208 struct xseg_request *req;
210 req = prepare_write_objects_v2(pr, map, mn->objectidx, 1);
214 data = xseg_get_data(peer->xseg, req);
215 object_to_map_v2((unsigned char *)(data), mn);
220 int read_map_objects_v2(struct map *map, unsigned char *data, uint64_t start, uint64_t nr)
223 struct map_node *map_node;
226 char nulls[SHA256_DIGEST_SIZE];
227 memset(nulls, 0, SHA256_DIGEST_SIZE);
229 r = !memcmp(data, nulls, SHA256_DIGEST_SIZE);
231 XSEGLOG2(&lc, E, "Data are zeros");
235 if (start + nr > map->nr_objs) {
240 XSEGLOG2(&lc, D, "Allocating %llu nr_objs for size %llu",
241 map->nr_objs, map->size);
242 map_node = calloc(map->nr_objs, sizeof(struct map_node));
244 XSEGLOG2(&lc, E, "Cannot allocate mem for %llu objects",
248 map->objects = map_node;
249 r = initialize_map_objects(map);
251 XSEGLOG2(&lc, E, "Cannot initialize map objects for map %s",
257 map_node = map->objects;
259 for (i = start; i < nr; i++) {
260 r = read_object_v2(&map_node[i], data+pos);
262 XSEGLOG2(&lc, E, "Map %s: Could not read object %llu",
266 pos += v2_objectsize_in_map;
276 int read_map_v2(struct map *m, unsigned char *data)
279 return read_map_objects_v2(m, data, 0, m->nr_objs);
282 void write_map_data_v2_cb(struct peer_req *pr, struct xseg_request *req)
284 struct mapper_io *mio = __get_mapper_io(pr);
286 if (req->state & XS_FAILED) {
288 XSEGLOG2(&lc, E, "Request failed");
292 if (req->serviced != req->size) {
294 XSEGLOG2(&lc, E, "Serviced != size");
299 put_request(pr, req);
305 int __write_map_data_v2(struct peer_req *pr, struct map *map)
308 struct peerd *peer = pr->peer;
309 struct mapperd *mapper = __get_mapperd(peer);
310 struct mapper_io *mio = __get_mapper_io(pr);
312 struct xseg_request *req;
313 char target[XSEG_MAX_TARGETLEN];
315 uint64_t nr_objs_per_block, nr_objs_per_chunk, nr_chunks_per_block;
316 uint64_t nr_map_blocks, i, j;
317 uint64_t k, start, limit, pos, count;
318 char buf[sizeof(i)*2 + 1];
322 datalen = v2_read_chunk_size;
325 nr_objs_per_chunk = v2_read_chunk_size/v2_objectsize_in_map;
326 nr_chunks_per_block = map->blocksize/v2_read_chunk_size;
327 // nr_objs_per_block = map->blocksize / v2_objectsize_in_map;
328 nr_objs_per_block = nr_chunks_per_block * nr_objs_per_chunk;
329 nr_map_blocks = map->nr_objs / nr_objs_per_block;
330 if (map->nr_objs % nr_objs_per_block) {
334 XSEGLOG2(&lc, D, "nr_objs_per_chunk: %llu, nr_chunks_per_block: %llu, "
335 "nr_objs_per_block: %llu, nr_map_blocks: %llu",
336 nr_objs_per_chunk, nr_chunks_per_block, nr_objs_per_block,
338 for (i = 0; i < nr_map_blocks && count < map->nr_objs; i++) {
339 for (j = 0; j < nr_chunks_per_block && count < map->nr_objs; j++) {
340 hexlify((unsigned char *)&i, sizeof(i), buf);
341 buf[2*sizeof(i)] = 0;
342 sprintf(target, "%s_%s", map->volume, buf);
343 targetlen = map->volumelen + 1 + (sizeof(i) << 1);
345 req = get_request(pr, mapper->mbportno, target,
348 XSEGLOG2(&lc, E, "Cannot get request");
352 req->offset = j * v2_read_chunk_size;
353 req->size = v2_read_chunk_size;
354 data = xseg_get_data(peer->xseg, req);
355 start = i * nr_objs_per_block + j * nr_objs_per_chunk;
356 limit = start + nr_objs_per_chunk;
358 for (k = start; k < map->nr_objs && k < limit; k++) {
359 mn = &map->objects[k];
360 object_to_map_v2((unsigned char *)(data+pos), mn);
361 pos += v2_objectsize_in_map;
364 XSEGLOG2(&lc, D, "Writing chunk %s(%u) , offset :%llu",
365 target, targetlen, req->offset);
367 r = send_request(pr, req);
369 XSEGLOG2(&lc, E, "Cannot send request");
373 count += nr_objs_per_chunk;
379 put_request(pr, req);
385 int __write_objects_v2(struct peer_req *pr, struct map *map, uint64_t start, uint64_t nr)
388 struct mapper_io *mio = __get_mapper_io(pr);
389 struct xseg_request *req;
390 struct obj2chunk o2c;
392 XSEGLOG2(&lc, D, "Writing objects for %s: start: %llu, nr: %llu",
393 map->volume, start, nr);
394 if (start + nr > map->nr_objs) {
395 XSEGLOG2(&lc, E, "Attempting to write beyond nr_objs");
400 o2c = get_chunk(map, start, nr);
402 req = prepare_write_objects_o2c_v2(pr, map, o2c);
404 XSEGLOG2(&lc, D, "Writing chunk %s(%u) , offset :%llu",
405 o2c.target, o2c.targetlen, req->offset);
408 r = send_request(pr, req);
410 XSEGLOG2(&lc, E, "Cannot send request");
411 put_request(pr, req);
417 start += o2c.nr_objs;
422 int write_objects_v2(struct peer_req *pr, struct map *map, uint64_t start, uint64_t nr)
425 //unsigned char *buf;
426 struct mapper_io *mio = __get_mapper_io(pr);
427 mio->cb = write_map_data_v2_cb;
429 r = __write_objects_v2(pr, map, start, nr);
433 if (mio->pending_reqs > 0)
434 wait_on_pr(pr, mio->pending_reqs > 0);
438 return (mio->err ? -1 : 0);
441 int write_map_data_v2(struct peer_req *pr, struct map *map)
443 return write_objects_v2(pr, map, 0, map->nr_objs);
446 void load_map_data_v2_cb(struct peer_req *pr, struct xseg_request *req)
450 struct mapper_io *mio = __get_mapper_io(pr);
451 struct peerd *peer = pr->peer;
452 buf = (unsigned char *)__get_node(mio, req);
454 XSEGLOG2(&lc, I, "Callback of req %p, buf: %p", req, buf);
456 //buf = (unsigned char *)mio->priv;
458 XSEGLOG2(&lc, E, "Cannot get load buffer");
463 if (req->state & XS_FAILED) {
465 XSEGLOG2(&lc, E, "Request failed");
469 if (req->serviced != req->size) {
471 XSEGLOG2(&lc, E, "Serviced != size");
475 data = xseg_get_data(peer->xseg, req);
476 XSEGLOG2(&lc, D, "Memcpy %llu to %p (%u)", req->serviced, buf, *(uint32_t *)(data+1));
477 memcpy(buf, data, req->serviced);
480 __set_node(mio, req, NULL);
481 put_request(pr, req);
487 int __load_map_objects_v2(struct peer_req *pr, struct map *map, uint64_t start, uint64_t nr, unsigned char *buf)
490 struct peerd *peer = pr->peer;
491 struct mapperd *mapper = __get_mapperd(peer);
492 struct mapper_io *mio = __get_mapper_io(pr);
494 struct xseg_request *req;
495 struct obj2chunk o2c;
497 datalen = v2_read_chunk_size;
499 if (start + nr > map->nr_objs) {
500 XSEGLOG2(&lc, E, "Attempting to load beyond nr_objs");
505 o2c = get_chunk(map, start, nr);
507 req = get_request(pr, mapper->mbportno, o2c.target,
508 o2c.targetlen, datalen);
510 XSEGLOG2(&lc, E, "Cannot get request");
514 req->offset = o2c.offset;
517 XSEGLOG2(&lc, D, "Reading chunk %s(%u) , offset :%llu",
518 o2c.target, o2c.targetlen, req->offset);
520 r = __set_node(mio, req, (struct map_node *)(buf + o2c.start_obj * v2_objectsize_in_map));
522 r = send_request(pr, req);
524 XSEGLOG2(&lc, E, "Cannot send request");
529 start += o2c.nr_objs;
534 put_request(pr, req);
540 int load_map_objects_v2(struct peer_req *pr, struct map *map, uint64_t start, uint64_t nr)
544 struct mapper_io *mio = __get_mapper_io(pr);
545 uint32_t buf_size = sizeof(unsigned char) * nr * v2_objectsize_in_map;
548 if (map->flags & MF_MAP_DELETED) {
549 XSEGLOG2(&lc, I, "Map deleted. Ignoring loading objects");
553 if (buf_size < v2_read_chunk_size) {
554 buf_size = v2_read_chunk_size;
556 /* buf size must be a multiple of v2_read_chunk_size */
557 rem = buf_size % v2_read_chunk_size;
558 XSEGLOG2(&lc, D, "Buf size %u, rem: %u", buf_size, rem);
560 buf_size += (v2_read_chunk_size - rem);
561 XSEGLOG2(&lc, D, "Allocating %u bytes buffer", buf_size);
562 buf = malloc(buf_size);
564 XSEGLOG2(&lc, E, "Cannot allocate memory");
569 mio->cb = load_map_data_v2_cb;
571 r = __load_map_objects_v2(pr, map, start, nr, buf);
575 if (mio->pending_reqs > 0)
576 wait_on_pr(pr, mio->pending_reqs > 0);
579 XSEGLOG2(&lc, E, "Error issuing load request");
582 r = read_map_objects_v2(map, buf, start, nr);
590 return (mio->err ? -1 : 0);
593 int load_map_data_v2(struct peer_req *pr, struct map *map)
595 return load_map_objects_v2(pr, map, 0, map->nr_objs);
598 int read_map_metadata_v2(struct map *map, unsigned char *metadata,
599 uint32_t metadata_len)
602 char nulls[v2_mapheader_size];
604 if (metadata_len < v2_mapheader_size) {
605 XSEGLOG2(&lc, E, "Metadata len < v2_mapheader_size");
608 memset(nulls, 0, v2_mapheader_size);
609 r = !memcmp(metadata, nulls, v2_mapheader_size);
611 XSEGLOG2(&lc, E, "Read zeros");
617 map->version = *(uint32_t *)(metadata + pos);
618 pos += sizeof(uint32_t);
619 map->size = *(uint64_t *)(metadata + pos);
620 pos += sizeof(uint64_t);
621 map->blocksize = *(uint32_t *)(metadata + pos);
622 pos += sizeof(uint32_t);
623 //FIXME check each flag seperately
624 map->flags = *(uint32_t *)(metadata + pos);
625 pos += sizeof(uint32_t);
626 map->epoch = *(uint64_t *)(metadata + pos);
627 pos += sizeof(uint64_t);
629 //map->flags &= MF_MAP_SANITIZE;
631 map->nr_objs = calc_map_obj(map);
636 struct xseg_request * __write_map_metadata_v2(struct peer_req *pr, struct map *map)
638 struct peerd *peer = pr->peer;
639 struct mapperd *mapper = __get_mapperd(peer);
641 struct xseg_request *req;
646 datalen = v2_mapheader_size;
648 req = get_request(pr, mapper->mbportno, map->volume, map->volumelen,
651 XSEGLOG2(&lc, E, "Cannot get request for map %s",
657 data = xseg_get_data(peer->xseg, req);
659 memcpy(data + pos, &map->version, sizeof(map->version));
660 pos += sizeof(map->version);
661 memcpy(data + pos, &map->size, sizeof(map->size));
662 pos += sizeof(map->size);
663 memcpy(data + pos, &map->blocksize, sizeof(map->blocksize));
664 pos += sizeof(map->blocksize);
665 //FIXME check each flag seperately
666 memcpy(data + pos, &map->flags, sizeof(map->flags));
667 pos += sizeof(map->flags);
668 memcpy(data + pos, &map->epoch, sizeof(map->epoch));
669 pos += sizeof(map->epoch);
675 r = send_request(pr, req);
677 XSEGLOG2(&lc, E, "Cannot send request %p, pr: %p, map: %s",
678 req, pr, map->volume);
684 put_request(pr, req);
689 int write_map_metadata_v2(struct peer_req *pr, struct map *map)
691 struct xseg_request *req;
693 req = __write_map_metadata_v2(pr, map);
696 wait_on_pr(pr, (!(req->state & XS_FAILED || req->state & XS_SERVED)));
698 if (req->state & XS_FAILED){
699 XSEGLOG2(&lc, E, "Write map metadata failed for map %s", map->volume);
700 put_request(pr, req);
703 put_request(pr, req);