1 # Copyright 2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from archipelago.common import Xseg_ctx, Request, Filed, Mapperd, Vlmcd, Sosd, \
37 from archipelago.archipelago import start_peer, stop_peer
39 import unittest2 as unittest
40 from xseg.xprotocol import *
41 from xseg.xseg_api import *
46 from binascii import hexlify, unhexlify
47 from hashlib import sha256
49 def get_random_string(length=64, repeat=16):
50 nr_repeats = length//repeat
53 for i in range(repeat):
54 l.append(chr(ord('a') + rnd.randint(0,25)))
55 random_string = ''.join(l)
58 for i in range(nr_repeats):
59 l.append(random_string)
61 l.append(random_string[0:rem])
65 def recursive_remove(path):
66 for root, dirs, files in os.walk(path, topdown=False):
68 os.remove(os.path.join(root, name))
70 os.rmdir(os.path.join(root, name))
72 def merkle_hash(hashes):
74 return sha256('').digest()
79 while s < len(hashes):
81 hashes += [('\x00' * len(hashes[0]))] * (s - len(hashes))
82 while len(hashes) > 1 :
83 hashes = [sha256(hashes[i] + hashes[i + 1]).digest() for i in range (0, len(hashes), 2)]
89 # archipelago.common.BIN_DIR=os.path.join(os.getcwd(), '../../peers/user/')
90 archipelago.common.LOGS_PATH=os.path.join(os.getcwd(), 'logs')
91 archipelago.common.PIDFILE_PATH=os.path.join(os.getcwd(), 'pids')
92 if not os.path.isdir(archipelago.common.LOGS_PATH):
93 os.makedirs(archipelago.common.LOGS_PATH)
94 if not os.path.isdir(archipelago.common.PIDFILE_PATH):
95 os.makedirs(archipelago.common.PIDFILE_PATH)
97 recursive_remove(archipelago.common.LOGS_PATH)
99 class XsegTest(unittest.TestCase):
100 spec = "posix:testsegment:8:16:256:12".encode()
101 blocksize = 4*1024*1024
105 self.segment = Segment('posix', 'testsegment', 8, 16, 256, 12)
107 self.segment.create()
108 except Exception as e:
109 self.segment.destroy()
110 self.segment.create()
111 self.xseg = Xseg_ctx(self.segment)
117 self.segment.destroy()
120 def get_reply_info(size):
121 xinfo = xseg_reply_info()
126 def get_hash_reply(hashstring):
127 xhash = xseg_reply_hash()
128 xhash.target = hashstring
129 xhash.targetlen = len(hashstring)
133 def get_object_name(volume, epoch, index):
134 epoch_64 = ctypes.c_uint64(epoch)
135 index_64 = ctypes.c_uint64(index)
136 epoch_64_char = ctypes.cast(ctypes.addressof(epoch_64), ctypes.c_char_p)
137 index_64_char = ctypes.cast(ctypes.addressof(index_64), ctypes.c_char_p)
138 epoch_64_str = ctypes.string_at(epoch_64_char, ctypes.sizeof(ctypes.c_uint64))
139 index_64_str = ctypes.string_at(index_64_char, ctypes.sizeof(ctypes.c_uint64))
140 epoch_hex = hexlify(epoch_64_str)
141 index_hex = hexlify(index_64_str)
142 return "archip_" + volume + "_" + epoch_hex + "_" + index_hex
145 def get_map_reply(offset, size):
146 blocksize = XsegTest.blocksize
147 ret = xseg_reply_map()
148 cnt = (offset+size)//blocksize - offset//blocksize
149 if (offset+size) % blocksize > 0 :
152 SegsArray = xseg_reply_map_scatterlist * cnt
155 offset = offset % blocksize
156 for i in range(0, cnt):
157 segs[i].offset = offset
158 segs[i].size = blocksize - offset
159 if segs[i].size > rem_size:
160 segs[i].size = rem_size
162 rem_size -= segs[i].size
164 raise Error("Calculation error")
170 def get_list_of_hashes(xreply, from_segment=False):
175 SegsArray = xseg_reply_map_scatterlist * cnt
176 array = SegsArray.from_address(ctypes.addressof(segs))
178 for i in range(0, cnt):
179 hashes.append(ctypes.string_at(segs[i].target, segs[i].targetlen))
183 def get_zero_map_reply(offset, size):
184 ret = XsegTest.get_map_reply(offset, size);
186 for i in range(0, cnt):
187 ret.segs[i].target = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
188 ret.segs[i].targetlen = len(ret.segs[i].target)
192 def get_copy_map_reply(volume, offset, size, epoch):
193 blocksize = XsegTest.blocksize
194 objidx_start = offset//blocksize
195 ret = XsegTest.get_map_reply(offset, size);
197 for i in range(0, cnt):
198 ret.segs[i].target = MapperdTest.get_object_name(volume, epoch,
200 ret.segs[i].targetlen = len(ret.segs[i].target)
203 def get_req(self, op, dst, target, data=None, size=0, offset=0, datalen=0,
205 return Request(self.xseg, dst, target, data=data, size=size,
206 offset=offset, datalen=datalen, flags=flags, op=op)
208 def assert_equal_xseg(self, req, expected_data):
209 if isinstance(expected_data, xseg_reply_info):
210 datasize = ctypes.sizeof(expected_data)
211 self.assertEqual(datasize, req.get_datalen())
212 data = req.get_data(type(expected_data)).contents
213 self.assertEqual(data.size, expected_data.size)
214 elif isinstance(expected_data, xseg_reply_map):
215 #since xseg_reply_map uses a flexible array for the
216 #xseg_reply_map_scatterlist reply, we calculate the size of the
217 #reply in the segment, by subtracting the size of the pointer to
218 #the array, in the python object
219 datasize = ctypes.sizeof(expected_data)
220 datasize -= ctypes.sizeof(expected_data.segs)
221 datasize += expected_data.cnt*ctypes.sizeof(xseg_reply_map_scatterlist)
222 self.assertEqual(datasize, req.get_datalen())
223 data = req.get_data(type(expected_data)).contents
225 self.assertEqual(data.cnt, expected_data.cnt)
227 SegsArray = xseg_reply_map_scatterlist * cnt
228 array = SegsArray.from_address(ctypes.addressof(segs))
229 expected_array = expected_data.segs
230 for i in range(0, cnt):
231 t = ctypes.string_at(array[i].target, array[i].targetlen)
232 self.assertEqual(array[i].targetlen, expected_array[i].targetlen)
233 self.assertEqual(t, expected_array[i].target)
234 self.assertEqual(array[i].offset, expected_array[i].offset)
235 self.assertEqual(array[i].size, expected_array[i].size)
236 elif isinstance(expected_data, xseg_reply_hash):
237 datasize = ctypes.sizeof(expected_data)
238 self.assertEqual(datasize, req.get_datalen())
239 data = req.get_data(type(expected_data)).contents
240 self.assertEqual(data.targetlen, expected_data.targetlen)
241 t = ctypes.string_at(data.target, data.targetlen)
242 et = ctypes.string_at(expected_data.target, expected_data.targetlen)
243 self.assertEqual(t, et)
245 raise Error("Unknown data type")
247 def evaluate_req(self, req, success=True, serviced=None, data=None):
249 self.assertFalse(req.success())
252 self.assertTrue(req.success())
253 if serviced is not None:
254 self.assertEqual(req.get_serviced(), serviced)
256 if isinstance(data, basestring):
258 self.assertEqual(datalen, req.get_datalen())
259 self.assertEqual(data, ctypes.string_at(req.get_data(None), datalen))
261 self.assert_equal_xseg(req, data)
263 def evaluate(send_func):
264 def send_and_evaluate(self, dst, target, expected=True, serviced=None,
265 expected_data=None, **kwargs):
266 req = send_func(self, dst, target, **kwargs)
268 self.evaluate_req(req, success=expected, serviced=serviced,
270 self.assertTrue(req.put())
271 return send_and_evaluate
273 def send_write(self, dst, target, data=None, offset=0, datalen=0, flags=0):
274 #assert datalen >= size
275 # req = self.get_req(X_WRITE, dst, target, data, size=size, offset=offset, datalen=datalen)
276 req = Request.get_write_request(self.xseg, dst, target, data=data,
277 offset=offset, datalen=datalen, flags=flags)
281 send_and_evaluate_write = evaluate(send_write)
283 def send_read(self, dst, target, size=0, datalen=0, offset=0):
284 #assert datalen >= size
285 # req = self.get_req(X_READ, dst, target, data=None, size=size, offset=offset, datalen=datalen)
286 req = Request.get_read_request(self.xseg, dst, target, size=size,
287 offset=offset, datalen=datalen)
291 send_and_evaluate_read = evaluate(send_read)
293 def send_info(self, dst, target):
294 #req = self.get_req(X_INFO, dst, target, data=None, size=0)
295 req = Request.get_info_request(self.xseg, dst, target)
299 send_and_evaluate_info = evaluate(send_info)
301 def send_copy(self, dst, src_target, dst_target=None, size=0, offset=0):
302 #datalen = ctypes.sizeof(xseg_request_copy)
303 #xcopy = xseg_request_copy()
304 #xcopy.target = src_target
305 #xcopy.targetlen = len(src_target)
306 # req = self.get_req(X_COPY, dst, dst_target, data=xcopy, datalen=datalen,
307 # offset=offset, size=size)
308 req = Request.get_copy_request(self.xseg, dst, src_target,
309 copy_target=dst_target, size=size, offset=offset)
313 send_and_evaluate_copy = evaluate(send_copy)
315 def send_acquire(self, dst, target):
316 #req = self.get_req(X_ACQUIRE, dst, target, flags=XF_NOSYNC)
317 req = Request.get_acquire_request(self.xseg, dst, target)
321 send_and_evaluate_acquire = evaluate(send_acquire)
323 def send_release(self, dst, target, force=False):
324 #req_flags = XF_NOSYNC
326 #req_flags |= XF_FORCE
327 #req = self.get_req(X_RELEASE, dst, target, size=0, flags=req_flags)
328 req = Request.get_release_request(self.xseg, dst, target, force)
332 send_and_evaluate_release = evaluate(send_release)
334 def send_delete(self, dst, target):
335 #req = self.get_req(X_DELETE, dst, target)
336 req = Request.get_delete_request(self.xseg, dst, target)
340 send_and_evaluate_delete = evaluate(send_delete)
342 def send_clone(self, dst, src_target, clone=None, clone_size=0,
344 #xclone = xseg_request_clone()
345 #xclone.target = src_target
346 #xclone.targetlen = len(src_target)
347 #xclone.size = clone_size
349 #req = self.get_req(X_CLONE, dst, clone, data=xclone,
350 #datalen=ctypes.sizeof(xclone))
351 req = Request.get_clone_request(self.xseg, dst, src_target,
352 clone=clone, clone_size=clone_size, cont_addr=cont_addr)
356 send_and_evaluate_clone = evaluate(send_clone)
358 def send_snapshot(self, dst, src_target, snap=None):
359 #xsnapshot = xseg_request_snapshot()
360 #xsnapshot.target = snap
361 #xsnapshot.targetlen = len(snap)
363 #req = self.get_req(X_SNAPSHOT, dst, src_target, data=xsnapshot,
364 #datalen=ctypes.sizeof(xsnapshot))
365 req = Request.get_snapshot_request(self.xseg, dst, src_target, snap=snap)
369 send_and_evaluate_snapshot = evaluate(send_snapshot)
371 def send_open(self, dst, target):
372 #req = self.get_req(X_OPEN, dst, target)
373 req = Request.get_open_request(self.xseg, dst, target)
377 send_and_evaluate_open = evaluate(send_open)
379 def send_close(self, dst, target):
380 #req = self.get_req(X_CLOSE, dst, target)
381 req = Request.get_close_request(self.xseg, dst, target)
385 send_and_evaluate_close = evaluate(send_close)
387 def send_map_read(self, dst, target, offset=0, size=0):
388 #req = self.get_req(X_MAPR, dst, target, size=size, offset=offset,
390 req = Request.get_mapr_request(self.xseg, dst, target, offset=offset,
395 send_and_evaluate_map_read = evaluate(send_map_read)
397 def send_map_write(self, dst, target, offset=0, size=0):
398 #req = self.get_req(X_MAPW, dst, target, size=size, offset=offset,
400 req = Request.get_mapw_request(self.xseg, dst, target, offset=offset,
405 send_and_evaluate_map_write = evaluate(send_map_write)
407 def send_hash(self, dst, target, size=0):
408 #req = self.get_req(X_hash, dst, target, data=None, size=0)
409 req = Request.get_hash_request(self.xseg, dst, target, size=size)
413 send_and_evaluate_hash = evaluate(send_hash)
415 def get_filed(self, args, clean=False):
416 path = args['archip_dir']
417 if not os.path.exists(path):
421 recursive_remove(path)
425 def get_sosd(self, args, clean=False):
428 cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
430 if cluster.pool_exists(pool):
431 cluster.delete_pool(pool)
432 cluster.create_pool(pool)
437 def get_mapperd(self, args):
438 return Mapperd(**args)
440 def get_vlmcd(self, args):
443 class VlmcdTest(XsegTest):
445 'role': 'vlmctest-blockerb',
446 'spec': XsegTest.spec,
448 'archip_dir': '/tmp/bfiledtest/',
456 'role': 'vlmctest-blockerm',
457 'spec': XsegTest.spec,
459 'archip_dir': '/tmp/mfiledtest/',
467 'role': 'vlmctest-mapper',
468 'spec': XsegTest.spec,
478 'role': 'vlmctest-vlmc',
479 'spec': XsegTest.spec,
490 super(VlmcdTest, self).setUp()
492 self.blockerm = self.get_filed(self.mfiled_args, clean=True)
493 self.blockerb = self.get_filed(self.bfiled_args, clean=True)
494 self.mapperd = self.get_mapperd(self.mapperd_args)
495 self.vlmcd = self.get_vlmcd(self.vlmcd_args)
496 self.vlmcdport = self.vlmcd.portno_start
497 self.mapperdport = self.mapperd.portno_start
498 self.blockerbport = self.blockerb.portno_start
499 start_peer(self.blockerm)
500 start_peer(self.blockerb)
501 start_peer(self.mapperd)
502 start_peer(self.vlmcd)
503 except Exception as e:
505 stop_peer(self.vlmcd)
506 stop_peer(self.mapperd)
507 stop_peer(self.blockerb)
508 stop_peer(self.blockerm)
509 super(VlmcdTest, self).tearDown()
513 stop_peer(self.vlmcd)
514 stop_peer(self.mapperd)
515 stop_peer(self.blockerb)
516 stop_peer(self.blockerm)
517 super(VlmcdTest, self).tearDown()
521 volsize = 10*1024*1024
522 self.send_and_evaluate_open(self.vlmcdport, volume, expected=False)
523 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
525 self.send_and_evaluate_open(self.vlmcdport, volume)
526 self.send_and_evaluate_open(self.vlmcdport, volume)
528 def test_close(self):
530 volsize = 10*1024*1024
531 self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
532 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
534 self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
535 self.send_and_evaluate_open(self.vlmcdport, volume)
536 self.send_and_evaluate_close(self.vlmcdport, volume)
537 self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
541 volsize = 10*1024*1024
542 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
544 xinfo = self.get_reply_info(volsize)
545 self.send_and_evaluate_info(self.vlmcdport, volume, expected_data=xinfo)
547 def test_write_read(self):
549 data = get_random_string(datalen, 16)
551 volsize = 10*1024*1024
553 self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
555 self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
557 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
559 self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
561 self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
564 def test_clone_snapshot(self):
567 snap2 = "mysnapshot2"
568 snap2 = "mysnapshot3"
572 volsize = 100*1024*1024*1024
573 clone2size = 200*1024*1024*1024
574 offset = 90*1024*1024*1024
577 zeros = '\x00' * size
578 data = get_random_string(size, 16)
579 data2 = get_random_string(size, 16)
581 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
583 self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
584 offset=offset, expected_data=zeros)
586 self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
587 self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
588 offset=offset, expected_data=zeros)
589 self.send_and_evaluate_write(self.vlmcdport, volume, data=data, offset=offset,
591 self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
592 offset=offset, expected_data=zeros)
593 self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
594 offset=offset, expected_data=data)
596 self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap2)
597 self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
598 offset=offset, expected_data=data)
599 self.send_and_evaluate_clone(self.mapperdport, snap2, clone=clone1,
600 clone_size=clone2size)
601 self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
602 offset=offset, expected_data=data)
603 self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
604 offset=volsize+offset, expected_data=zeros)
606 self.send_and_evaluate_write(self.vlmcdport, clone1, data=data2,
607 offset=offset, serviced=size)
608 self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
609 offset=offset, expected_data=data2)
610 self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
611 offset=offset, expected_data=data)
612 self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
613 offset=offset, expected_data=data)
614 self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
615 offset=offset, expected_data=zeros)
617 def test_info2(self):
619 volsize = 10*1024*1024
620 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
622 xinfo = self.get_reply_info(volsize)
624 reqs.add(self.send_info(self.vlmcdport, volume))
625 reqs.add(self.send_info(self.vlmcdport, volume))
627 req = self.xseg.wait_requests(reqs)
628 self.evaluate_req(req, data=xinfo)
630 self.assertTrue(req.put())
632 def test_flush(self):
634 data = get_random_string(datalen, 16)
636 volsize = 10*1024*1024
638 #This may seems weird, but actually vlmcd flush, only guarantees that
639 #there are no pending operation the volume. On a volume that does not
640 #exists, this is always true, so this should succeed.
641 self.send_and_evaluate_write(self.vlmcdport, volume, data="",
642 flags=XF_FLUSH, expected=True)
643 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
645 self.send_and_evaluate_write(self.vlmcdport, volume, data="",
647 self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
649 self.send_and_evaluate_write(self.vlmcdport, volume, data="",
652 def test_flush2(self):
654 volsize = 10*1024*1024
656 data = get_random_string(datalen, 16)
658 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
660 xinfo = self.get_reply_info(volsize)
662 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
663 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
664 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
665 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
666 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
667 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
668 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
669 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
670 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
671 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
672 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
673 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
674 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
675 reqs.add(self.send_write(self.vlmcdport, volume, data="", flags=XF_FLUSH))
676 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
677 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
678 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
679 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
680 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
681 reqs.add(self.send_write(self.vlmcdport, volume, data=data))
683 req = self.xseg.wait_requests(reqs)
684 self.evaluate_req(req)
686 self.assertTrue(req.put())
689 blocksize = self.blocksize
691 volume2 = "myvolume2"
694 volsize = 10*1024*1024
698 data = get_random_string(size, 16)
700 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
702 self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
703 offset=offset, serviced=size)
705 self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
707 self.send_and_evaluate_hash(self.mapperdport, volume, size=volsize,
709 req = self.send_hash(self.mapperdport, snap, size=volsize)
711 self.assertTrue(req.success())
712 xreply = req.get_data(xseg_reply_hash).contents
713 hash_map = ctypes.string_at(xreply.target, xreply.targetlen)
716 req = self.send_map_read(self.mapperdport, snap, offset=0,
719 self.assertTrue(req.success())
720 xreply = req.get_data(xseg_reply_map).contents
721 blocks = self.get_list_of_hashes(xreply, from_segment=True)
725 if (b == sha256('').hexdigest()):
726 h.append(unhexlify(b))
728 req = self.send_hash(self.blockerbport, b, size=blocksize)
730 self.assertTrue(req.success())
731 xreply = req.get_data(xseg_reply_hash).contents
732 h.append(unhexlify(ctypes.string_at(xreply.target, xreply.targetlen)))
735 mh = hexlify(merkle_hash(h))
736 self.assertEqual(hash_map, mh)
738 self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
739 clone_size=volsize * 2, expected=False)
740 self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
741 clone_size=volsize * 2, cont_addr=True)
742 self.send_and_evaluate_read(self.vlmcdport, volume2, size=size,
743 offset=offset, expected_data=data)
744 self.send_and_evaluate_read(self.vlmcdport, volume2, size=volsize - size,
745 offset=offset + size, expected_data='\x00' * (volsize - size))
748 class MapperdTest(XsegTest):
750 'role': 'mappertest-blockerb',
751 'spec': XsegTest.spec,
753 'archip_dir': '/tmp/bfiledtest/',
761 'role': 'mappertest-blockerm',
762 'spec': XsegTest.spec,
764 'archip_dir': '/tmp/mfiledtest/',
772 'role': 'mappertest-mapper',
773 'spec': XsegTest.spec,
782 blocksize = 4*1024*1024
785 super(MapperdTest, self).setUp()
787 self.blockerm = self.get_filed(self.mfiled_args, clean=True)
788 self.blockerb = self.get_filed(self.bfiled_args, clean=True)
789 self.mapperd = self.get_mapperd(self.mapperd_args)
790 self.mapperdport = self.mapperd.portno_start
791 start_peer(self.blockerm)
792 start_peer(self.blockerb)
793 start_peer(self.mapperd)
794 except Exception as e:
796 stop_peer(self.mapperd)
797 stop_peer(self.blockerb)
798 stop_peer(self.blockerm)
799 super(MapperdTest, self).tearDown()
803 stop_peer(self.mapperd)
804 stop_peer(self.blockerb)
805 stop_peer(self.blockerm)
806 super(MapperdTest, self).tearDown()
808 def test_create(self):
810 volsize = 10*1024*1024
811 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
812 clone_size=0, expected=False)
813 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
815 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
816 clone_size=volsize, expected=False)
818 def test_delete(self):
820 volsize = 10*1024*1024
825 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
826 clone_size=0, expected=False)
827 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
829 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
830 clone_size=volsize, expected=False)
831 self.send_and_evaluate_delete(self.mapperdport, volume)
832 self.send_and_evaluate_delete(self.mapperdport, volume, expected=False)
833 self.send_and_evaluate_map_write(self.mapperdport, volume,
834 offset=offset, size=size, expected=False)
835 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
838 ret = self.get_copy_map_reply(volume, offset, size, epoch)
840 self.send_and_evaluate_map_write(self.mapperdport, volume,
841 expected_data=ret, offset=offset, size=size)
843 def test_clone_snapshot(self):
846 snap2 = "mysnapshot2"
847 snap2 = "mysnapshot3"
850 volsize = 100*1024*1024*1024
851 clone2size = 200*1024*1024*1024
852 offset = 90*1024*1024*1024
858 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
860 self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap)
861 self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap,
863 xinfo = self.get_reply_info(volsize)
864 self.send_and_evaluate_info(self.mapperdport, snap, expected_data=xinfo)
865 ret = self.get_zero_map_reply(offset, size)
866 self.send_and_evaluate_map_read(self.mapperdport, volume,
867 expected_data=ret, offset=offset, size=size)
868 self.send_and_evaluate_map_read(self.mapperdport, snap,
869 expected_data=ret, offset=offset, size=size)
871 ret = self.get_copy_map_reply(volume, offset, size, epoch)
872 self.send_and_evaluate_map_write(self.mapperdport, volume,
873 expected_data=ret, offset=offset, size=size)
875 stop_peer(self.mapperd)
876 start_peer(self.mapperd)
877 self.send_and_evaluate_map_read(self.mapperdport, volume,
878 expected_data=ret, offset=offset, size=size)
880 self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone1)
881 xinfo = self.get_reply_info(volsize)
882 self.send_and_evaluate_info(self.mapperdport, clone1, expected_data=xinfo)
883 self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
884 clone_size=2, expected=False)
885 self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
886 clone_size=clone2size)
887 xinfo = self.get_reply_info(clone2size)
888 self.send_and_evaluate_info(self.mapperdport, clone2, expected_data=xinfo)
892 volsize = 10*1024*1024
893 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
895 xinfo = self.get_reply_info(volsize)
896 self.send_and_evaluate_info(self.mapperdport, volume, expected_data=xinfo)
898 def test_info2(self):
900 volsize = 10*1024*1024
901 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
903 xinfo = self.get_reply_info(volsize)
905 reqs.add(self.send_info(self.mapperdport, volume))
906 reqs.add(self.send_info(self.mapperdport, volume))
908 req = self.xseg.wait_requests(reqs)
909 self.evaluate_req(req, data=xinfo)
911 self.assertTrue(req.put())
915 volsize = 10*1024*1024
916 self.send_and_evaluate_open(self.mapperdport, volume, expected=False)
917 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
919 self.send_and_evaluate_open(self.mapperdport, volume)
920 self.send_and_evaluate_open(self.mapperdport, volume)
922 def test_open2(self):
924 volsize = 10*1024*1024
925 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
928 reqs.add(self.send_open(self.mapperdport, volume))
929 reqs.add(self.send_open(self.mapperdport, volume))
931 req = self.xseg.wait_requests(reqs)
932 self.evaluate_req(req)
934 self.assertTrue(req.put())
936 def test_close(self):
938 volsize = 10*1024*1024
939 self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
940 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
942 self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
943 self.send_and_evaluate_open(self.mapperdport, volume)
944 self.send_and_evaluate_close(self.mapperdport, volume)
945 self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
949 volsize = 10*1024*1024
952 ret = MapperdTest.get_zero_map_reply(offset, size)
953 self.send_and_evaluate_map_read(self.mapperdport, volume,
954 offset=offset, size=size, expected=False)
955 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
957 self.send_and_evaluate_map_read(self.mapperdport, volume,
958 expected_data=ret, offset=offset, size=size)
960 self.send_and_evaluate_map_read(self.mapperdport, volume,
961 offset=offset, size=size, expected=False)
963 self.send_and_evaluate_map_read(self.mapperdport, volume,
964 offset=offset, size=size, expected=False)
966 def test_mapr2(self):
968 volsize = 10*1024*1024
972 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
975 reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
977 reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
979 ret = MapperdTest.get_zero_map_reply(offset, size)
981 req = self.xseg.wait_requests(reqs)
982 self.evaluate_req(req, data=ret)
984 self.assertTrue(req.put())
987 blocksize = self.blocksize
989 volsize = 100*1024*1024*1024
990 offset = 90*1024*1024*1024 - 2
994 ret = self.get_copy_map_reply(volume, offset, size, epoch)
996 self.send_and_evaluate_map_write(self.mapperdport, volume,
997 offset=offset, size=size, expected=False)
998 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
1000 self.send_and_evaluate_map_write(self.mapperdport, volume,
1001 expected_data=ret, offset=offset, size=size)
1002 self.send_and_evaluate_map_read(self.mapperdport, volume,
1003 expected_data = ret, offset=offset, size=size)
1004 stop_peer(self.mapperd)
1005 start_peer(self.mapperd)
1006 self.send_and_evaluate_map_read(self.mapperdport, volume,
1007 expected_data=ret, offset=offset, size=size)
1008 self.send_and_evaluate_open(self.mapperdport, volume)
1009 offset = 101*1024*1024*1024
1010 self.send_and_evaluate_map_write(self.mapperdport, volume,
1011 offset=offset, size=size, expected=False)
1012 offset = 100*1024*1024*1024 - 1
1013 self.send_and_evaluate_map_write(self.mapperdport, volume,
1014 offset=offset, size=size, expected=False)
1016 def test_mapw2(self):
1017 blocksize = self.blocksize
1019 volsize = 100*1024*1024*1024
1020 offset = 90*1024*1024*1024 - 2
1024 ret = self.get_copy_map_reply(volume, offset, size, epoch)
1026 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
1030 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1032 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1034 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1036 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1038 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1040 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1042 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1044 while len(reqs) > 0:
1045 req = self.xseg.wait_requests(reqs)
1046 self.evaluate_req(req, data=ret)
1048 self.assertTrue(req.put())
1050 class BlockerTest(object):
1051 def test_write_read(self):
1053 data = get_random_string(datalen, 16)
1055 xinfo = self.get_reply_info(datalen)
1057 self.send_and_evaluate_write(self.blockerport, target, data=data,
1059 self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1061 self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1062 stop_peer(self.blocker)
1063 start_peer(self.blocker)
1064 self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1066 self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1068 def test_info(self):
1070 data = get_random_string(datalen, 16)
1072 self.send_and_evaluate_write(self.blockerport, target, data=data,
1074 xinfo = self.get_reply_info(datalen)
1075 self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1077 def test_copy(self):
1079 data = get_random_string(datalen, 16)
1081 copy_target = "copy_target"
1083 self.send_and_evaluate_write(self.blockerport, target, data=data,
1085 self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1086 expected_data=data, serviced=datalen)
1087 self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
1088 size=datalen, serviced=datalen)
1089 self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
1090 size=datalen+1, serviced=datalen+1)
1091 self.send_and_evaluate_read(self.blockerport, copy_target, size=datalen,
1095 def test_delete(self):
1097 data = get_random_string(datalen, 16)
1099 self.send_and_evaluate_delete(self.blockerport, target, False)
1100 self.send_and_evaluate_write(self.blockerport, target, data=data)
1101 self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1103 self.send_and_evaluate_delete(self.blockerport, target, True)
1104 data = '\x00' * datalen
1105 self.send_and_evaluate_read(self.blockerport, target, size=datalen, expected_data=data)
1107 def test_hash(self):
1109 data = '\x00'*datalen
1110 target = "target_zeros"
1113 self.send_and_evaluate_write(self.blockerport, target, data=data,
1115 ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1116 self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1117 expected_data=ret, serviced=datalen)
1120 data = get_random_string(datalen, 16)
1121 self.send_and_evaluate_write(self.blockerport, target, data=data,
1123 ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1124 self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1125 expected_data=ret, serviced=datalen)
1126 self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1127 expected_data=ret, serviced=datalen)
1128 self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1129 expected_data=ret, serviced=datalen)
1131 def test_locking(self):
1133 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1134 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1135 self.send_and_evaluate_release(self.blockerport, target, expected=True)
1136 self.send_and_evaluate_release(self.blockerport, target, expected=False)
1137 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1138 stop_peer(self.blocker)
1139 start_peer(self.blocker)
1140 self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1141 self.send_and_evaluate_release(self.blockerport, target, expected=False)
1142 self.send_and_evaluate_release(self.blockerport, target, force=True,
1146 class FiledTest(BlockerTest, XsegTest):
1148 'role': 'testfiled',
1149 'spec': XsegTest.spec,
1151 'archip_dir': '/tmp/filedtest/',
1152 'prefix': 'archip_',
1160 super(FiledTest, self).setUp()
1162 self.blocker = self.get_filed(self.filed_args, clean=True)
1163 self.blockerport = self.blocker.portno_start
1164 start_peer(self.blocker)
1165 except Exception as e:
1166 super(FiledTest, self).tearDown()
1170 stop_peer(self.blocker)
1171 super(FiledTest, self).tearDown()
1173 def test_locking(self):
1175 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1176 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1177 self.send_and_evaluate_release(self.blockerport, target, expected=True)
1178 self.send_and_evaluate_release(self.blockerport, target, expected=False)
1179 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1180 stop_peer(self.blocker)
1181 new_filed_args = copy(self.filed_args)
1182 new_filed_args['unique_str'] = 'ThisisSparta'
1183 self.blocker = Filed(**new_filed_args)
1184 start_peer(self.blocker)
1185 self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1186 self.send_and_evaluate_release(self.blockerport, target, expected=False)
1187 self.send_and_evaluate_release(self.blockerport, target, force=True,
1190 class SosdTest(BlockerTest, XsegTest):
1193 'spec': XsegTest.spec,
1199 'pool': 'test_sosd',
1204 super(SosdTest, self).setUp()
1206 self.blocker = self.get_sosd(self.filed_args, clean=True)
1207 self.blockerport = self.blocker.portno_start
1208 start_peer(self.blocker)
1209 except Exception as e:
1210 super(SosdTest, self).tearDown()
1214 stop_peer(self.blocker)
1215 super(SosdTest, self).tearDown()
1217 if __name__=='__main__':