1 # Copyright 2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from archipelago.common import Xseg_ctx, Request, Filed, Mapperd, Vlmcd, Sosd, \
37 from archipelago.archipelago import start_peer, stop_peer
39 import unittest2 as unittest
40 from xseg.xprotocol import *
41 from xseg.xseg_api import *
46 from binascii import hexlify, unhexlify
47 from hashlib import sha256
49 def get_random_string(length=64, repeat=16):
50 nr_repeats = length//repeat
53 for i in range(repeat):
54 l.append(chr(ord('a') + rnd.randint(0,25)))
55 random_string = ''.join(l)
58 for i in range(nr_repeats):
59 l.append(random_string)
61 l.append(random_string[0:rem])
65 def recursive_remove(path):
66 for root, dirs, files in os.walk(path, topdown=False):
68 os.remove(os.path.join(root, name))
70 os.rmdir(os.path.join(root, name))
72 def merkle_hash(hashes):
74 return sha256('').digest()
79 while s < len(hashes):
81 hashes += [('\x00' * len(hashes[0]))] * (s - len(hashes))
82 while len(hashes) > 1 :
83 hashes = [sha256(hashes[i] + hashes[i + 1]).digest() for i in range (0, len(hashes), 2)]
89 # archipelago.common.BIN_DIR=os.path.join(os.getcwd(), '../../peers/user/')
90 archipelago.common.LOGS_PATH=os.path.join(os.getcwd(), 'logs')
91 archipelago.common.PIDFILE_PATH=os.path.join(os.getcwd(), 'pids')
92 if not os.path.isdir(archipelago.common.LOGS_PATH):
93 os.makedirs(archipelago.common.LOGS_PATH)
94 if not os.path.isdir(archipelago.common.PIDFILE_PATH):
95 os.makedirs(archipelago.common.PIDFILE_PATH)
97 recursive_remove(archipelago.common.LOGS_PATH)
99 class XsegTest(unittest.TestCase):
101 spec = "posix:testsegment:16:256:12".encode()
102 blocksize = 4*1024*1024
106 self.segment = Segment('posix', 'testsegment', 16, 256, 12)
108 self.segment.create()
109 except Exception as e:
110 self.segment.destroy()
111 self.segment.create()
112 self.xseg = Xseg_ctx(self.segment, self.myport)
118 self.segment.destroy()
121 def get_reply_info(size):
122 xinfo = xseg_reply_info()
127 def get_hash_reply(hashstring):
128 xhash = xseg_reply_hash()
129 xhash.target = hashstring
130 xhash.targetlen = len(hashstring)
134 def get_object_name(volume, epoch, index):
135 epoch_64 = ctypes.c_uint64(epoch)
136 index_64 = ctypes.c_uint64(index)
137 epoch_64_char = ctypes.cast(ctypes.addressof(epoch_64), ctypes.c_char_p)
138 index_64_char = ctypes.cast(ctypes.addressof(index_64), ctypes.c_char_p)
139 epoch_64_str = ctypes.string_at(epoch_64_char, ctypes.sizeof(ctypes.c_uint64))
140 index_64_str = ctypes.string_at(index_64_char, ctypes.sizeof(ctypes.c_uint64))
141 epoch_hex = hexlify(epoch_64_str)
142 index_hex = hexlify(index_64_str)
143 return "archip_" + volume + "_" + epoch_hex + "_" + index_hex
146 def get_map_reply(offset, size):
147 blocksize = XsegTest.blocksize
148 ret = xseg_reply_map()
149 cnt = (offset+size)//blocksize - offset//blocksize
150 if (offset+size) % blocksize > 0 :
153 SegsArray = xseg_reply_map_scatterlist * cnt
156 offset = offset % blocksize
157 for i in range(0, cnt):
158 segs[i].offset = offset
159 segs[i].size = blocksize - offset
160 if segs[i].size > rem_size:
161 segs[i].size = rem_size
163 rem_size -= segs[i].size
165 raise Error("Calculation error")
171 def get_list_of_hashes(xreply, from_segment=False):
176 SegsArray = xseg_reply_map_scatterlist * cnt
177 array = SegsArray.from_address(ctypes.addressof(segs))
179 for i in range(0, cnt):
180 hashes.append(ctypes.string_at(segs[i].target, segs[i].targetlen))
184 def get_zero_map_reply(offset, size):
185 ret = XsegTest.get_map_reply(offset, size);
187 for i in range(0, cnt):
188 ret.segs[i].target = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
189 ret.segs[i].targetlen = len(ret.segs[i].target)
193 def get_copy_map_reply(volume, offset, size, epoch):
194 blocksize = XsegTest.blocksize
195 objidx_start = offset//blocksize
196 ret = XsegTest.get_map_reply(offset, size);
198 for i in range(0, cnt):
199 ret.segs[i].target = MapperdTest.get_object_name(volume, epoch,
201 ret.segs[i].targetlen = len(ret.segs[i].target)
204 def get_req(self, op, dst, target, data=None, size=0, offset=0, datalen=0,
206 return Request(self.xseg, dst, target, data=data, size=size,
207 offset=offset, datalen=datalen, flags=flags, op=op)
209 def assert_equal_xseg(self, req, expected_data):
210 if isinstance(expected_data, xseg_reply_info):
211 datasize = ctypes.sizeof(expected_data)
212 self.assertEqual(datasize, req.get_datalen())
213 data = req.get_data(type(expected_data)).contents
214 self.assertEqual(data.size, expected_data.size)
215 elif isinstance(expected_data, xseg_reply_map):
216 #since xseg_reply_map uses a flexible array for the
217 #xseg_reply_map_scatterlist reply, we calculate the size of the
218 #reply in the segment, by subtracting the size of the pointer to
219 #the array, in the python object
220 datasize = ctypes.sizeof(expected_data)
221 datasize -= ctypes.sizeof(expected_data.segs)
222 datasize += expected_data.cnt*ctypes.sizeof(xseg_reply_map_scatterlist)
223 self.assertEqual(datasize, req.get_datalen())
224 data = req.get_data(type(expected_data)).contents
226 self.assertEqual(data.cnt, expected_data.cnt)
228 SegsArray = xseg_reply_map_scatterlist * cnt
229 array = SegsArray.from_address(ctypes.addressof(segs))
230 expected_array = expected_data.segs
231 for i in range(0, cnt):
232 t = ctypes.string_at(array[i].target, array[i].targetlen)
233 self.assertEqual(array[i].targetlen, expected_array[i].targetlen)
234 self.assertEqual(t, expected_array[i].target)
235 self.assertEqual(array[i].offset, expected_array[i].offset)
236 self.assertEqual(array[i].size, expected_array[i].size)
237 elif isinstance(expected_data, xseg_reply_hash):
238 datasize = ctypes.sizeof(expected_data)
239 self.assertEqual(datasize, req.get_datalen())
240 data = req.get_data(type(expected_data)).contents
241 self.assertEqual(data.targetlen, expected_data.targetlen)
242 t = ctypes.string_at(data.target, data.targetlen)
243 et = ctypes.string_at(expected_data.target, expected_data.targetlen)
244 self.assertEqual(t, et)
246 raise Error("Unknown data type")
248 def evaluate_req(self, req, success=True, serviced=None, data=None):
250 self.assertFalse(req.success())
253 self.assertTrue(req.success())
254 if serviced is not None:
255 self.assertEqual(req.get_serviced(), serviced)
257 if isinstance(data, basestring):
259 self.assertEqual(datalen, req.get_datalen())
260 self.assertEqual(data, ctypes.string_at(req.get_data(None), datalen))
262 self.assert_equal_xseg(req, data)
264 def evaluate(send_func):
265 def send_and_evaluate(self, dst, target, expected=True, serviced=None,
266 expected_data=None, **kwargs):
267 req = send_func(self, dst, target, **kwargs)
269 self.evaluate_req(req, success=expected, serviced=serviced,
271 self.assertTrue(req.put())
272 return send_and_evaluate
274 def send_write(self, dst, target, data=None, offset=0, datalen=0):
275 #assert datalen >= size
276 # req = self.get_req(X_WRITE, dst, target, data, size=size, offset=offset, datalen=datalen)
277 req = Request.get_write_request(self.xseg, dst, target, data=data,
278 offset=offset, datalen=datalen)
282 send_and_evaluate_write = evaluate(send_write)
284 def send_read(self, dst, target, size=0, datalen=0, offset=0):
285 #assert datalen >= size
286 # req = self.get_req(X_READ, dst, target, data=None, size=size, offset=offset, datalen=datalen)
287 req = Request.get_read_request(self.xseg, dst, target, size=size,
288 offset=offset, datalen=datalen)
292 send_and_evaluate_read = evaluate(send_read)
294 def send_info(self, dst, target):
295 #req = self.get_req(X_INFO, dst, target, data=None, size=0)
296 req = Request.get_info_request(self.xseg, dst, target)
300 send_and_evaluate_info = evaluate(send_info)
302 def send_copy(self, dst, src_target, dst_target=None, size=0, offset=0):
303 #datalen = ctypes.sizeof(xseg_request_copy)
304 #xcopy = xseg_request_copy()
305 #xcopy.target = src_target
306 #xcopy.targetlen = len(src_target)
307 # req = self.get_req(X_COPY, dst, dst_target, data=xcopy, datalen=datalen,
308 # offset=offset, size=size)
309 req = Request.get_copy_request(self.xseg, dst, src_target,
310 copy_target=dst_target, size=size, offset=offset)
314 send_and_evaluate_copy = evaluate(send_copy)
316 def send_acquire(self, dst, target):
317 #req = self.get_req(X_ACQUIRE, dst, target, flags=XF_NOSYNC)
318 req = Request.get_acquire_request(self.xseg, dst, target)
322 send_and_evaluate_acquire = evaluate(send_acquire)
324 def send_release(self, dst, target, force=False):
325 #req_flags = XF_NOSYNC
327 #req_flags |= XF_FORCE
328 #req = self.get_req(X_RELEASE, dst, target, size=0, flags=req_flags)
329 req = Request.get_release_request(self.xseg, dst, target, force)
333 send_and_evaluate_release = evaluate(send_release)
335 def send_delete(self, dst, target):
336 #req = self.get_req(X_DELETE, dst, target)
337 req = Request.get_delete_request(self.xseg, dst, target)
341 send_and_evaluate_delete = evaluate(send_delete)
343 def send_clone(self, dst, src_target, clone=None, clone_size=0,
345 #xclone = xseg_request_clone()
346 #xclone.target = src_target
347 #xclone.targetlen = len(src_target)
348 #xclone.size = clone_size
350 #req = self.get_req(X_CLONE, dst, clone, data=xclone,
351 #datalen=ctypes.sizeof(xclone))
352 req = Request.get_clone_request(self.xseg, dst, src_target,
353 clone=clone, clone_size=clone_size, cont_addr=cont_addr)
357 send_and_evaluate_clone = evaluate(send_clone)
359 def send_snapshot(self, dst, src_target, snap=None):
360 #xsnapshot = xseg_request_snapshot()
361 #xsnapshot.target = snap
362 #xsnapshot.targetlen = len(snap)
364 #req = self.get_req(X_SNAPSHOT, dst, src_target, data=xsnapshot,
365 #datalen=ctypes.sizeof(xsnapshot))
366 req = Request.get_snapshot_request(self.xseg, dst, src_target, snap=snap)
370 send_and_evaluate_snapshot = evaluate(send_snapshot)
372 def send_open(self, dst, target):
373 #req = self.get_req(X_OPEN, dst, target)
374 req = Request.get_open_request(self.xseg, dst, target)
378 send_and_evaluate_open = evaluate(send_open)
380 def send_close(self, dst, target):
381 #req = self.get_req(X_CLOSE, dst, target)
382 req = Request.get_close_request(self.xseg, dst, target)
386 send_and_evaluate_close = evaluate(send_close)
388 def send_map_read(self, dst, target, offset=0, size=0):
389 #req = self.get_req(X_MAPR, dst, target, size=size, offset=offset,
391 req = Request.get_mapr_request(self.xseg, dst, target, offset=offset,
396 send_and_evaluate_map_read = evaluate(send_map_read)
398 def send_map_write(self, dst, target, offset=0, size=0):
399 #req = self.get_req(X_MAPW, dst, target, size=size, offset=offset,
401 req = Request.get_mapw_request(self.xseg, dst, target, offset=offset,
406 send_and_evaluate_map_write = evaluate(send_map_write)
408 def send_hash(self, dst, target, size=0):
409 #req = self.get_req(X_hash, dst, target, data=None, size=0)
410 req = Request.get_hash_request(self.xseg, dst, target, size=size)
414 send_and_evaluate_hash = evaluate(send_hash)
416 def get_filed(self, args, clean=False):
417 path = args['archip_dir']
418 if not os.path.exists(path):
422 recursive_remove(path)
426 def get_sosd(self, args, clean=False):
429 cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
431 if cluster.pool_exists(pool):
432 cluster.delete_pool(pool)
433 cluster.create_pool(pool)
438 def get_mapperd(self, args):
439 return Mapperd(**args)
441 def get_vlmcd(self, args):
444 class VlmcdTest(XsegTest):
446 'role': 'vlmctest-blockerb',
447 'spec': XsegTest.spec,
449 'archip_dir': '/tmp/bfiledtest/',
457 'role': 'vlmctest-blockerm',
458 'spec': XsegTest.spec,
460 'archip_dir': '/tmp/mfiledtest/',
468 'role': 'vlmctest-mapper',
469 'spec': XsegTest.spec,
479 'role': 'vlmctest-vlmc',
480 'spec': XsegTest.spec,
491 super(VlmcdTest, self).setUp()
493 self.blockerm = self.get_filed(self.mfiled_args, clean=True)
494 self.blockerb = self.get_filed(self.bfiled_args, clean=True)
495 self.mapperd = self.get_mapperd(self.mapperd_args)
496 self.vlmcd = self.get_vlmcd(self.vlmcd_args)
497 self.vlmcdport = self.vlmcd.portno_start
498 self.mapperdport = self.mapperd.portno_start
499 self.blockerbport = self.blockerb.portno_start
500 start_peer(self.blockerm)
501 start_peer(self.blockerb)
502 start_peer(self.mapperd)
503 start_peer(self.vlmcd)
504 except Exception as e:
506 stop_peer(self.vlmcd)
507 stop_peer(self.mapperd)
508 stop_peer(self.blockerb)
509 stop_peer(self.blockerm)
510 super(VlmcdTest, self).tearDown()
514 stop_peer(self.vlmcd)
515 stop_peer(self.mapperd)
516 stop_peer(self.blockerb)
517 stop_peer(self.blockerm)
518 super(VlmcdTest, self).tearDown()
522 volsize = 10*1024*1024
523 self.send_and_evaluate_open(self.vlmcdport, volume, expected=False)
524 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
526 self.send_and_evaluate_open(self.vlmcdport, volume)
527 self.send_and_evaluate_open(self.vlmcdport, volume)
529 def test_close(self):
531 volsize = 10*1024*1024
532 self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
533 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
535 self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
536 self.send_and_evaluate_open(self.vlmcdport, volume)
537 self.send_and_evaluate_close(self.vlmcdport, volume)
538 self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
542 volsize = 10*1024*1024
543 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
545 xinfo = self.get_reply_info(volsize)
546 self.send_and_evaluate_info(self.vlmcdport, volume, expected_data=xinfo)
548 def test_write_read(self):
550 data = get_random_string(datalen, 16)
552 volsize = 10*1024*1024
554 self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
556 self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
558 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
560 self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
562 self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
565 def test_clone_snapshot(self):
568 snap2 = "mysnapshot2"
569 snap2 = "mysnapshot3"
573 volsize = 100*1024*1024*1024
574 clone2size = 200*1024*1024*1024
575 offset = 90*1024*1024*1024
578 zeros = '\x00' * size
579 data = get_random_string(size, 16)
580 data2 = get_random_string(size, 16)
582 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
584 self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
585 offset=offset, expected_data=zeros)
587 self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
588 self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
589 offset=offset, expected_data=zeros)
590 self.send_and_evaluate_write(self.vlmcdport, volume, data=data, offset=offset,
592 self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
593 offset=offset, expected_data=zeros)
594 self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
595 offset=offset, expected_data=data)
597 self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap2)
598 self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
599 offset=offset, expected_data=data)
600 self.send_and_evaluate_clone(self.mapperdport, snap2, clone=clone1,
601 clone_size=clone2size)
602 self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
603 offset=offset, expected_data=data)
604 self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
605 offset=volsize+offset, expected_data=zeros)
607 self.send_and_evaluate_write(self.vlmcdport, clone1, data=data2,
608 offset=offset, serviced=size)
609 self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
610 offset=offset, expected_data=data2)
611 self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
612 offset=offset, expected_data=data)
613 self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
614 offset=offset, expected_data=data)
615 self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
616 offset=offset, expected_data=zeros)
618 def test_info2(self):
620 volsize = 10*1024*1024
621 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
623 xinfo = self.get_reply_info(volsize)
625 reqs.add(self.send_info(self.vlmcdport, volume))
626 reqs.add(self.send_info(self.vlmcdport, volume))
628 req = self.xseg.wait_requests(reqs)
629 self.evaluate_req(req, data=xinfo)
633 blocksize = self.blocksize
635 volume2 = "myvolume2"
638 volsize = 10*1024*1024
642 data = get_random_string(size, 16)
644 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
646 self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
647 offset=offset, serviced=size)
649 self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
651 self.send_and_evaluate_hash(self.mapperdport, volume, size=volsize,
653 req = self.send_hash(self.mapperdport, snap, size=volsize)
655 self.assertTrue(req.success())
656 xreply = req.get_data(xseg_reply_hash).contents
657 hash_map = ctypes.string_at(xreply.target, xreply.targetlen)
660 req = self.send_map_read(self.mapperdport, snap, offset=0,
663 self.assertTrue(req.success())
664 xreply = req.get_data(xseg_reply_map).contents
665 blocks = self.get_list_of_hashes(xreply, from_segment=True)
669 if (b == sha256('').hexdigest()):
670 h.append(unhexlify(b))
672 req = self.send_hash(self.blockerbport, b, size=blocksize)
674 self.assertTrue(req.success())
675 xreply = req.get_data(xseg_reply_hash).contents
676 h.append(unhexlify(ctypes.string_at(xreply.target, xreply.targetlen)))
679 mh = hexlify(merkle_hash(h))
680 self.assertEqual(hash_map, mh)
682 self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
683 clone_size=volsize * 2, expected=False)
684 self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
685 clone_size=volsize * 2, cont_addr=True)
686 self.send_and_evaluate_read(self.vlmcdport, volume2, size=size,
687 offset=offset, expected_data=data)
688 self.send_and_evaluate_read(self.vlmcdport, volume2, size=volsize - size,
689 offset=offset + size, expected_data='\x00' * (volsize - size))
692 class MapperdTest(XsegTest):
694 'role': 'mappertest-blockerb',
695 'spec': XsegTest.spec,
697 'archip_dir': '/tmp/bfiledtest/',
705 'role': 'mappertest-blockerm',
706 'spec': XsegTest.spec,
708 'archip_dir': '/tmp/mfiledtest/',
716 'role': 'mappertest-mapper',
717 'spec': XsegTest.spec,
726 blocksize = 4*1024*1024
729 super(MapperdTest, self).setUp()
731 self.blockerm = self.get_filed(self.mfiled_args, clean=True)
732 self.blockerb = self.get_filed(self.bfiled_args, clean=True)
733 self.mapperd = self.get_mapperd(self.mapperd_args)
734 self.mapperdport = self.mapperd.portno_start
735 start_peer(self.blockerm)
736 start_peer(self.blockerb)
737 start_peer(self.mapperd)
738 except Exception as e:
740 stop_peer(self.mapperd)
741 stop_peer(self.blockerb)
742 stop_peer(self.blockerm)
743 super(MapperdTest, self).tearDown()
747 stop_peer(self.mapperd)
748 stop_peer(self.blockerb)
749 stop_peer(self.blockerm)
750 super(MapperdTest, self).tearDown()
752 def test_create(self):
754 volsize = 10*1024*1024
755 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
756 clone_size=0, expected=False)
757 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
759 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
760 clone_size=volsize, expected=False)
762 def test_delete(self):
764 volsize = 10*1024*1024
769 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
770 clone_size=0, expected=False)
771 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
773 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
774 clone_size=volsize, expected=False)
775 self.send_and_evaluate_delete(self.mapperdport, volume)
776 self.send_and_evaluate_delete(self.mapperdport, volume, expected=False)
777 self.send_and_evaluate_map_write(self.mapperdport, volume,
778 offset=offset, size=size, expected=False)
779 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
782 ret = self.get_copy_map_reply(volume, offset, size, epoch)
784 self.send_and_evaluate_map_write(self.mapperdport, volume,
785 expected_data=ret, offset=offset, size=size)
787 def test_clone_snapshot(self):
790 snap2 = "mysnapshot2"
791 snap2 = "mysnapshot3"
794 volsize = 100*1024*1024*1024
795 clone2size = 200*1024*1024*1024
796 offset = 90*1024*1024*1024
802 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
804 self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap)
805 self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap,
807 xinfo = self.get_reply_info(volsize)
808 self.send_and_evaluate_info(self.mapperdport, snap, expected_data=xinfo)
809 ret = self.get_zero_map_reply(offset, size)
810 self.send_and_evaluate_map_read(self.mapperdport, volume,
811 expected_data=ret, offset=offset, size=size)
812 self.send_and_evaluate_map_read(self.mapperdport, snap,
813 expected_data=ret, offset=offset, size=size)
815 ret = self.get_copy_map_reply(volume, offset, size, epoch)
816 self.send_and_evaluate_map_write(self.mapperdport, volume,
817 expected_data=ret, offset=offset, size=size)
819 stop_peer(self.mapperd)
820 start_peer(self.mapperd)
821 self.send_and_evaluate_map_read(self.mapperdport, volume,
822 expected_data=ret, offset=offset, size=size)
824 self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone1)
825 xinfo = self.get_reply_info(volsize)
826 self.send_and_evaluate_info(self.mapperdport, clone1, expected_data=xinfo)
827 self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
828 clone_size=2, expected=False)
829 self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
830 clone_size=clone2size)
831 xinfo = self.get_reply_info(clone2size)
832 self.send_and_evaluate_info(self.mapperdport, clone2, expected_data=xinfo)
836 volsize = 10*1024*1024
837 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
839 xinfo = self.get_reply_info(volsize)
840 self.send_and_evaluate_info(self.mapperdport, volume, expected_data=xinfo)
842 def test_info2(self):
844 volsize = 10*1024*1024
845 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
847 xinfo = self.get_reply_info(volsize)
849 reqs.add(self.send_info(self.mapperdport, volume))
850 reqs.add(self.send_info(self.mapperdport, volume))
852 req = self.xseg.wait_requests(reqs)
853 self.evaluate_req(req, data=xinfo)
858 volsize = 10*1024*1024
859 self.send_and_evaluate_open(self.mapperdport, volume, expected=False)
860 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
862 self.send_and_evaluate_open(self.mapperdport, volume)
863 self.send_and_evaluate_open(self.mapperdport, volume)
865 def test_open2(self):
867 volsize = 10*1024*1024
868 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
871 reqs.add(self.send_open(self.mapperdport, volume))
872 reqs.add(self.send_open(self.mapperdport, volume))
874 req = self.xseg.wait_requests(reqs)
875 self.evaluate_req(req)
878 def test_close(self):
880 volsize = 10*1024*1024
881 self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
882 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
884 self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
885 self.send_and_evaluate_open(self.mapperdport, volume)
886 self.send_and_evaluate_close(self.mapperdport, volume)
887 self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
891 volsize = 10*1024*1024
894 ret = MapperdTest.get_zero_map_reply(offset, size)
895 self.send_and_evaluate_map_read(self.mapperdport, volume,
896 offset=offset, size=size, expected=False)
897 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
899 self.send_and_evaluate_map_read(self.mapperdport, volume,
900 expected_data=ret, offset=offset, size=size)
902 self.send_and_evaluate_map_read(self.mapperdport, volume,
903 offset=offset, size=size, expected=False)
905 self.send_and_evaluate_map_read(self.mapperdport, volume,
906 offset=offset, size=size, expected=False)
908 def test_mapr2(self):
910 volsize = 10*1024*1024
914 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
917 reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
919 reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
921 ret = MapperdTest.get_zero_map_reply(offset, size)
923 req = self.xseg.wait_requests(reqs)
924 self.evaluate_req(req, data=ret)
928 blocksize = self.blocksize
930 volsize = 100*1024*1024*1024
931 offset = 90*1024*1024*1024 - 2
935 ret = self.get_copy_map_reply(volume, offset, size, epoch)
937 self.send_and_evaluate_map_write(self.mapperdport, volume,
938 offset=offset, size=size, expected=False)
939 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
941 self.send_and_evaluate_map_write(self.mapperdport, volume,
942 expected_data=ret, offset=offset, size=size)
943 self.send_and_evaluate_map_read(self.mapperdport, volume,
944 expected_data = ret, offset=offset, size=size)
945 stop_peer(self.mapperd)
946 start_peer(self.mapperd)
947 self.send_and_evaluate_map_read(self.mapperdport, volume,
948 expected_data=ret, offset=offset, size=size)
949 self.send_and_evaluate_open(self.mapperdport, volume)
950 offset = 101*1024*1024*1024
951 self.send_and_evaluate_map_write(self.mapperdport, volume,
952 offset=offset, size=size, expected=False)
953 offset = 100*1024*1024*1024 - 1
954 self.send_and_evaluate_map_write(self.mapperdport, volume,
955 offset=offset, size=size, expected=False)
957 def test_mapw2(self):
958 blocksize = self.blocksize
960 volsize = 100*1024*1024*1024
961 offset = 90*1024*1024*1024 - 2
965 ret = self.get_copy_map_reply(volume, offset, size, epoch)
967 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
971 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
973 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
975 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
977 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
979 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
981 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
983 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
986 req = self.xseg.wait_requests(reqs)
987 self.evaluate_req(req, data=ret)
990 class BlockerTest(object):
991 def test_write_read(self):
993 data = get_random_string(datalen, 16)
995 xinfo = self.get_reply_info(datalen)
997 self.send_and_evaluate_write(self.blockerport, target, data=data,
999 self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1001 self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1002 stop_peer(self.blocker)
1003 start_peer(self.blocker)
1004 self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1006 self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1008 def test_info(self):
1010 data = get_random_string(datalen, 16)
1012 self.send_and_evaluate_write(self.blockerport, target, data=data,
1014 xinfo = self.get_reply_info(datalen)
1015 self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1017 def test_copy(self):
1019 data = get_random_string(datalen, 16)
1021 copy_target = "copy_target"
1023 self.send_and_evaluate_write(self.blockerport, target, data=data,
1025 self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1026 expected_data=data, serviced=datalen)
1027 self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
1028 size=datalen, serviced=datalen)
1029 self.send_and_evaluate_read(self.blockerport, copy_target, size=datalen,
1033 def test_delete(self):
1035 data = get_random_string(datalen, 16)
1037 self.send_and_evaluate_delete(self.blockerport, target, False)
1038 self.send_and_evaluate_write(self.blockerport, target, data=data)
1039 self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1041 self.send_and_evaluate_delete(self.blockerport, target, True)
1042 data = '\x00' * datalen
1043 self.send_and_evaluate_read(self.blockerport, target, size=datalen, expected_data=data)
1045 def test_hash(self):
1047 data = '\x00'*datalen
1048 target = "target_zeros"
1051 self.send_and_evaluate_write(self.blockerport, target, data=data,
1053 ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1054 self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1055 expected_data=ret, serviced=datalen)
1058 data = get_random_string(datalen, 16)
1059 self.send_and_evaluate_write(self.blockerport, target, data=data,
1061 ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1062 self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1063 expected_data=ret, serviced=datalen)
1064 self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1065 expected_data=ret, serviced=datalen)
1066 self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1067 expected_data=ret, serviced=datalen)
1069 def test_locking(self):
1071 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1072 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1073 self.send_and_evaluate_release(self.blockerport, target, expected=True)
1074 self.send_and_evaluate_release(self.blockerport, target, expected=False)
1075 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1076 stop_peer(self.blocker)
1077 start_peer(self.blocker)
1078 self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1079 self.send_and_evaluate_release(self.blockerport, target, expected=False)
1080 self.send_and_evaluate_release(self.blockerport, target, force=True,
1084 class FiledTest(BlockerTest, XsegTest):
1086 'role': 'testfiled',
1087 'spec': XsegTest.spec,
1089 'archip_dir': '/tmp/filedtest/',
1090 'prefix': 'archip_',
1098 super(FiledTest, self).setUp()
1100 self.blocker = self.get_filed(self.filed_args, clean=True)
1101 self.blockerport = self.blocker.portno_start
1102 start_peer(self.blocker)
1103 except Exception as e:
1104 super(FiledTest, self).tearDown()
1108 stop_peer(self.blocker)
1109 super(FiledTest, self).tearDown()
1111 def test_locking(self):
1113 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1114 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1115 self.send_and_evaluate_release(self.blockerport, target, expected=True)
1116 self.send_and_evaluate_release(self.blockerport, target, expected=False)
1117 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1118 stop_peer(self.blocker)
1119 new_filed_args = copy(self.filed_args)
1120 new_filed_args['unique_str'] = 'ThisisSparta'
1121 self.blocker = Filed(**new_filed_args)
1122 start_peer(self.blocker)
1123 self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1124 self.send_and_evaluate_release(self.blockerport, target, expected=False)
1125 self.send_and_evaluate_release(self.blockerport, target, force=True,
1128 class SosdTest(BlockerTest, XsegTest):
1131 'spec': XsegTest.spec,
1137 'pool': 'test_sosd',
1142 super(SosdTest, self).setUp()
1144 self.blocker = self.get_sosd(self.filed_args, clean=True)
1145 self.blockerport = self.blocker.portno_start
1146 start_peer(self.blocker)
1147 except Exception as e:
1148 super(SosdTest, self).tearDown()
1152 stop_peer(self.blocker)
1153 super(SosdTest, self).tearDown()
1155 if __name__=='__main__':