1 # Copyright 2013 GRNET S.A. All rights reserved.
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
7 # 1. Redistributions of source code must retain the above
8 # copyright notice, this list of conditions and the following
11 # 2. Redistributions in binary form must reproduce the above
12 # copyright notice, this list of conditions and the following
13 # disclaimer in the documentation and/or other materials
14 # provided with the distribution.
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
35 from archipelago.common import Xseg_ctx, Request, Filed, Mapperd, Vlmcd, Sosd, \
36 create_segment, destroy_segment, Error
37 from archipelago.archipelago import start_peer, stop_peer
39 import unittest2 as unittest
40 from xseg.xprotocol import *
41 from xseg.xseg_api import *
46 from binascii import hexlify, unhexlify
47 from hashlib import sha256
49 def get_random_string(length=64, repeat=16):
50 nr_repeats = length//repeat
53 for i in range(repeat):
54 l.append(chr(ord('a') + rnd.randint(0,25)))
55 random_string = ''.join(l)
58 for i in range(nr_repeats):
59 l.append(random_string)
61 l.append(random_string[0:rem])
65 def recursive_remove(path):
66 for root, dirs, files in os.walk(path, topdown=False):
68 os.remove(os.path.join(root, name))
70 os.rmdir(os.path.join(root, name))
72 def merkle_hash(hashes):
74 return sha256('').digest()
79 while s < len(hashes):
81 hashes += [('\x00' * len(hashes[0]))] * (s - len(hashes))
82 while len(hashes) > 1 :
83 hashes = [sha256(hashes[i] + hashes[i + 1]).digest() for i in range (0, len(hashes), 2)]
89 archipelago.common.BIN_DIR=os.path.join(os.getcwd(), '../../peers/user/')
90 archipelago.common.LOGS_PATH=os.path.join(os.getcwd(), 'logs')
91 archipelago.common.PIDFILE_PATH=os.path.join(os.getcwd(), 'pids')
92 if not os.path.isdir(archipelago.common.LOGS_PATH):
93 os.makedirs(archipelago.common.LOGS_PATH)
94 if not os.path.isdir(archipelago.common.PIDFILE_PATH):
95 os.makedirs(archipelago.common.PIDFILE_PATH)
97 recursive_remove(archipelago.common.LOGS_PATH)
99 class XsegTest(unittest.TestCase):
102 spec = "posix:testsegment:16:256:12".encode()
103 blocksize = 4*1024*1024
107 create_segment(self.spec)
108 except Exception as e:
109 destroy_segment(self.spec)
110 create_segment(self.spec)
111 self.xseg = Xseg_ctx(self.spec, self.myport)
116 destroy_segment(self.spec)
119 def get_reply_info(size):
120 xinfo = xseg_reply_info()
125 def get_hash_reply(hashstring):
126 xhash = xseg_reply_hash()
127 xhash.target = hashstring
128 xhash.targetlen = len(hashstring)
132 def get_object_name(volume, epoch, index):
133 epoch_64 = ctypes.c_uint64(epoch)
134 index_64 = ctypes.c_uint64(index)
135 epoch_64_char = ctypes.cast(ctypes.addressof(epoch_64), ctypes.c_char_p)
136 index_64_char = ctypes.cast(ctypes.addressof(index_64), ctypes.c_char_p)
137 epoch_64_str = ctypes.string_at(epoch_64_char, ctypes.sizeof(ctypes.c_uint64))
138 index_64_str = ctypes.string_at(index_64_char, ctypes.sizeof(ctypes.c_uint64))
139 epoch_hex = hexlify(epoch_64_str)
140 index_hex = hexlify(index_64_str)
141 return "archip_" + volume + "_" + epoch_hex + "_" + index_hex
144 def get_map_reply(offset, size):
145 blocksize = XsegTest.blocksize
146 ret = xseg_reply_map()
147 cnt = (offset+size)//blocksize - offset//blocksize
148 if (offset+size) % blocksize > 0 :
151 SegsArray = xseg_reply_map_scatterlist * cnt
154 offset = offset % blocksize
155 for i in range(0, cnt):
156 segs[i].offset = offset
157 segs[i].size = blocksize - offset
158 if segs[i].size > rem_size:
159 segs[i].size = rem_size
161 rem_size -= segs[i].size
163 raise Error("Calculation error")
169 def get_list_of_hashes(xreply, from_segment=False):
174 SegsArray = xseg_reply_map_scatterlist * cnt
175 array = SegsArray.from_address(ctypes.addressof(segs))
177 for i in range(0, cnt):
178 hashes.append(ctypes.string_at(segs[i].target, segs[i].targetlen))
182 def get_zero_map_reply(offset, size):
183 ret = XsegTest.get_map_reply(offset, size);
185 for i in range(0, cnt):
186 ret.segs[i].target = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
187 ret.segs[i].targetlen = len(ret.segs[i].target)
191 def get_copy_map_reply(volume, offset, size, epoch):
192 blocksize = XsegTest.blocksize
193 objidx_start = offset//blocksize
194 ret = XsegTest.get_map_reply(offset, size);
196 for i in range(0, cnt):
197 ret.segs[i].target = MapperdTest.get_object_name(volume, epoch,
199 ret.segs[i].targetlen = len(ret.segs[i].target)
202 def get_req(self, op, dst, target, data=None, size=0, offset=0, datalen=0,
204 return Request(self.xseg, dst, target, data=data, size=size,
205 offset=offset, datalen=datalen, flags=flags, op=op)
207 def assert_equal_xseg(self, req, expected_data):
208 if isinstance(expected_data, xseg_reply_info):
209 datasize = ctypes.sizeof(expected_data)
210 self.assertEqual(datasize, req.get_datalen())
211 data = req.get_data(type(expected_data)).contents
212 self.assertEqual(data.size, expected_data.size)
213 elif isinstance(expected_data, xseg_reply_map):
214 #since xseg_reply_map uses a flexible array for the
215 #xseg_reply_map_scatterlist reply, we calculate the size of the
216 #reply in the segment, by subtracting the size of the pointer to
217 #the array, in the python object
218 datasize = ctypes.sizeof(expected_data)
219 datasize -= ctypes.sizeof(expected_data.segs)
220 datasize += expected_data.cnt*ctypes.sizeof(xseg_reply_map_scatterlist)
221 self.assertEqual(datasize, req.get_datalen())
222 data = req.get_data(type(expected_data)).contents
224 self.assertEqual(data.cnt, expected_data.cnt)
226 SegsArray = xseg_reply_map_scatterlist * cnt
227 array = SegsArray.from_address(ctypes.addressof(segs))
228 expected_array = expected_data.segs
229 for i in range(0, cnt):
230 t = ctypes.string_at(array[i].target, array[i].targetlen)
231 self.assertEqual(array[i].targetlen, expected_array[i].targetlen)
232 self.assertEqual(t, expected_array[i].target)
233 self.assertEqual(array[i].offset, expected_array[i].offset)
234 self.assertEqual(array[i].size, expected_array[i].size)
235 elif isinstance(expected_data, xseg_reply_hash):
236 datasize = ctypes.sizeof(expected_data)
237 self.assertEqual(datasize, req.get_datalen())
238 data = req.get_data(type(expected_data)).contents
239 self.assertEqual(data.targetlen, expected_data.targetlen)
240 t = ctypes.string_at(data.target, data.targetlen)
241 et = ctypes.string_at(expected_data.target, expected_data.targetlen)
242 self.assertEqual(t, et)
244 raise Error("Unknown data type")
246 def evaluate_req(self, req, success=True, serviced=None, data=None):
248 self.assertFalse(req.success())
251 self.assertTrue(req.success())
252 if serviced is not None:
253 self.assertEqual(req.get_serviced(), serviced)
255 if isinstance(data, basestring):
257 self.assertEqual(datalen, req.get_datalen())
258 self.assertEqual(data, ctypes.string_at(req.get_data(None), datalen))
260 self.assert_equal_xseg(req, data)
262 def evaluate(send_func):
263 def send_and_evaluate(self, dst, target, expected=True, serviced=None,
264 expected_data=None, **kwargs):
265 req = send_func(self, dst, target, **kwargs)
267 self.evaluate_req(req, success=expected, serviced=serviced,
269 self.assertTrue(req.put())
270 return send_and_evaluate
272 def send_write(self, dst, target, data=None, offset=0, datalen=0):
273 #assert datalen >= size
274 # req = self.get_req(X_WRITE, dst, target, data, size=size, offset=offset, datalen=datalen)
275 req = Request.get_write_request(self.xseg, dst, target, data=data,
276 offset=offset, datalen=datalen)
280 send_and_evaluate_write = evaluate(send_write)
282 def send_read(self, dst, target, size=0, datalen=0, offset=0):
283 #assert datalen >= size
284 # req = self.get_req(X_READ, dst, target, data=None, size=size, offset=offset, datalen=datalen)
285 req = Request.get_read_request(self.xseg, dst, target, size=size,
286 offset=offset, datalen=datalen)
290 send_and_evaluate_read = evaluate(send_read)
292 def send_info(self, dst, target):
293 #req = self.get_req(X_INFO, dst, target, data=None, size=0)
294 req = Request.get_info_request(self.xseg, dst, target)
298 send_and_evaluate_info = evaluate(send_info)
300 def send_copy(self, dst, src_target, dst_target=None, size=0, offset=0):
301 #datalen = ctypes.sizeof(xseg_request_copy)
302 #xcopy = xseg_request_copy()
303 #xcopy.target = src_target
304 #xcopy.targetlen = len(src_target)
305 # req = self.get_req(X_COPY, dst, dst_target, data=xcopy, datalen=datalen,
306 # offset=offset, size=size)
307 req = Request.get_copy_request(self.xseg, dst, src_target,
308 copy_target=dst_target, size=size, offset=offset)
312 send_and_evaluate_copy = evaluate(send_copy)
314 def send_acquire(self, dst, target):
315 #req = self.get_req(X_ACQUIRE, dst, target, flags=XF_NOSYNC)
316 req = Request.get_acquire_request(self.xseg, dst, target)
320 send_and_evaluate_acquire = evaluate(send_acquire)
322 def send_release(self, dst, target, force=False):
323 #req_flags = XF_NOSYNC
325 #req_flags |= XF_FORCE
326 #req = self.get_req(X_RELEASE, dst, target, size=0, flags=req_flags)
327 req = Request.get_release_request(self.xseg, dst, target, force)
331 send_and_evaluate_release = evaluate(send_release)
333 def send_delete(self, dst, target):
334 #req = self.get_req(X_DELETE, dst, target)
335 req = Request.get_delete_request(self.xseg, dst, target)
339 send_and_evaluate_delete = evaluate(send_delete)
341 def send_clone(self, dst, src_target, clone=None, clone_size=0,
343 #xclone = xseg_request_clone()
344 #xclone.target = src_target
345 #xclone.targetlen = len(src_target)
346 #xclone.size = clone_size
348 #req = self.get_req(X_CLONE, dst, clone, data=xclone,
349 #datalen=ctypes.sizeof(xclone))
350 req = Request.get_clone_request(self.xseg, dst, src_target,
351 clone=clone, clone_size=clone_size, cont_addr=cont_addr)
355 send_and_evaluate_clone = evaluate(send_clone)
357 def send_snapshot(self, dst, src_target, snap=None):
358 #xsnapshot = xseg_request_snapshot()
359 #xsnapshot.target = snap
360 #xsnapshot.targetlen = len(snap)
362 #req = self.get_req(X_SNAPSHOT, dst, src_target, data=xsnapshot,
363 #datalen=ctypes.sizeof(xsnapshot))
364 req = Request.get_snapshot_request(self.xseg, dst, src_target, snap=snap)
368 send_and_evaluate_snapshot = evaluate(send_snapshot)
370 def send_open(self, dst, target):
371 #req = self.get_req(X_OPEN, dst, target)
372 req = Request.get_open_request(self.xseg, dst, target)
376 send_and_evaluate_open = evaluate(send_open)
378 def send_close(self, dst, target):
379 #req = self.get_req(X_CLOSE, dst, target)
380 req = Request.get_close_request(self.xseg, dst, target)
384 send_and_evaluate_close = evaluate(send_close)
386 def send_map_read(self, dst, target, offset=0, size=0):
387 #req = self.get_req(X_MAPR, dst, target, size=size, offset=offset,
389 req = Request.get_mapr_request(self.xseg, dst, target, offset=offset,
394 send_and_evaluate_map_read = evaluate(send_map_read)
396 def send_map_write(self, dst, target, offset=0, size=0):
397 #req = self.get_req(X_MAPW, dst, target, size=size, offset=offset,
399 req = Request.get_mapw_request(self.xseg, dst, target, offset=offset,
404 send_and_evaluate_map_write = evaluate(send_map_write)
406 def send_hash(self, dst, target, size=0):
407 #req = self.get_req(X_hash, dst, target, data=None, size=0)
408 req = Request.get_hash_request(self.xseg, dst, target, size=size)
412 send_and_evaluate_hash = evaluate(send_hash)
414 def get_filed(self, args, clean=False):
415 path = args['archip_dir']
416 if not os.path.exists(path):
420 recursive_remove(path)
424 def get_sosd(self, args, clean=False):
427 cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
429 if cluster.pool_exists(pool):
430 cluster.delete_pool(pool)
431 cluster.create_pool(pool)
436 def get_mapperd(self, args):
437 return Mapperd(**args)
439 def get_vlmcd(self, args):
442 class VlmcdTest(XsegTest):
444 'role': 'vlmctest-blockerb',
445 'spec': XsegTest.spec,
447 'archip_dir': '/tmp/bfiledtest/',
455 'role': 'vlmctest-blockerm',
456 'spec': XsegTest.spec,
458 'archip_dir': '/tmp/mfiledtest/',
466 'role': 'vlmctest-mapper',
467 'spec': XsegTest.spec,
477 'role': 'vlmctest-vlmc',
478 'spec': XsegTest.spec,
489 super(VlmcdTest, self).setUp()
491 self.blockerm = self.get_filed(self.mfiled_args, clean=True)
492 self.blockerb = self.get_filed(self.bfiled_args, clean=True)
493 self.mapperd = self.get_mapperd(self.mapperd_args)
494 self.vlmcd = self.get_vlmcd(self.vlmcd_args)
495 self.vlmcdport = self.vlmcd.portno_start
496 self.mapperdport = self.mapperd.portno_start
497 self.blockerbport = self.blockerb.portno_start
498 start_peer(self.blockerm)
499 start_peer(self.blockerb)
500 start_peer(self.mapperd)
501 start_peer(self.vlmcd)
502 except Exception as e:
504 stop_peer(self.vlmcd)
505 stop_peer(self.mapperd)
506 stop_peer(self.blockerb)
507 stop_peer(self.blockerm)
508 super(VlmcdTest, self).tearDown()
512 stop_peer(self.vlmcd)
513 stop_peer(self.mapperd)
514 stop_peer(self.blockerb)
515 stop_peer(self.blockerm)
516 super(VlmcdTest, self).tearDown()
520 volsize = 10*1024*1024
521 self.send_and_evaluate_open(self.vlmcdport, volume, expected=False)
522 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
524 self.send_and_evaluate_open(self.vlmcdport, volume)
525 self.send_and_evaluate_open(self.vlmcdport, volume)
527 def test_close(self):
529 volsize = 10*1024*1024
530 self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
531 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
533 self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
534 self.send_and_evaluate_open(self.vlmcdport, volume)
535 self.send_and_evaluate_close(self.vlmcdport, volume)
536 self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
540 volsize = 10*1024*1024
541 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
543 xinfo = self.get_reply_info(volsize)
544 self.send_and_evaluate_info(self.vlmcdport, volume, expected_data=xinfo)
546 def test_write_read(self):
548 data = get_random_string(datalen, 16)
550 volsize = 10*1024*1024
552 self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
554 self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
556 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
558 self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
560 self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
563 def test_clone_snapshot(self):
566 snap2 = "mysnapshot2"
567 snap2 = "mysnapshot3"
571 volsize = 100*1024*1024*1024
572 clone2size = 200*1024*1024*1024
573 offset = 90*1024*1024*1024
576 zeros = '\x00' * size
577 data = get_random_string(size, 16)
578 data2 = get_random_string(size, 16)
580 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
582 self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
583 offset=offset, expected_data=zeros)
585 self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
586 self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
587 offset=offset, expected_data=zeros)
588 self.send_and_evaluate_write(self.vlmcdport, volume, data=data, offset=offset,
590 self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
591 offset=offset, expected_data=zeros)
592 self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
593 offset=offset, expected_data=data)
595 self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap2)
596 self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
597 offset=offset, expected_data=data)
598 self.send_and_evaluate_clone(self.mapperdport, snap2, clone=clone1,
599 clone_size=clone2size)
600 self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
601 offset=offset, expected_data=data)
602 self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
603 offset=volsize+offset, expected_data=zeros)
605 self.send_and_evaluate_write(self.vlmcdport, clone1, data=data2,
606 offset=offset, serviced=size)
607 self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
608 offset=offset, expected_data=data2)
609 self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
610 offset=offset, expected_data=data)
611 self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
612 offset=offset, expected_data=data)
613 self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
614 offset=offset, expected_data=zeros)
616 def test_info2(self):
618 volsize = 10*1024*1024
619 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
621 xinfo = self.get_reply_info(volsize)
623 reqs.add(self.send_info(self.vlmcdport, volume))
624 reqs.add(self.send_info(self.vlmcdport, volume))
626 req = self.xseg.wait_requests(reqs)
627 self.evaluate_req(req, data=xinfo)
631 blocksize = self.blocksize
633 volume2 = "myvolume2"
636 volsize = 10*1024*1024
640 data = get_random_string(size, 16)
642 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
644 self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
645 offset=offset, serviced=size)
647 self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
649 self.send_and_evaluate_hash(self.mapperdport, volume, size=volsize,
651 req = self.send_hash(self.mapperdport, snap, size=volsize)
653 self.assertTrue(req.success())
654 xreply = req.get_data(xseg_reply_hash).contents
655 hash_map = ctypes.string_at(xreply.target, xreply.targetlen)
658 req = self.send_map_read(self.mapperdport, snap, offset=0,
661 self.assertTrue(req.success())
662 xreply = req.get_data(xseg_reply_map).contents
663 blocks = self.get_list_of_hashes(xreply, from_segment=True)
667 if (b == sha256('').hexdigest()):
668 h.append(unhexlify(b))
670 req = self.send_hash(self.blockerbport, b, size=blocksize)
672 self.assertTrue(req.success())
673 xreply = req.get_data(xseg_reply_hash).contents
674 h.append(unhexlify(ctypes.string_at(xreply.target, xreply.targetlen)))
677 mh = hexlify(merkle_hash(h))
678 self.assertEqual(hash_map, mh)
680 self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
681 clone_size=volsize * 2, expected=False)
682 self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
683 clone_size=volsize * 2, cont_addr=True)
684 self.send_and_evaluate_read(self.vlmcdport, volume2, size=size,
685 offset=offset, expected_data=data)
686 self.send_and_evaluate_read(self.vlmcdport, volume2, size=volsize - size,
687 offset=offset + size, expected_data='\x00' * (volsize - size))
690 class MapperdTest(XsegTest):
692 'role': 'mappertest-blockerb',
693 'spec': XsegTest.spec,
695 'archip_dir': '/tmp/bfiledtest/',
703 'role': 'mappertest-blockerm',
704 'spec': XsegTest.spec,
706 'archip_dir': '/tmp/mfiledtest/',
714 'role': 'mappertest-mapper',
715 'spec': XsegTest.spec,
724 blocksize = 4*1024*1024
727 super(MapperdTest, self).setUp()
729 self.blockerm = self.get_filed(self.mfiled_args, clean=True)
730 self.blockerb = self.get_filed(self.bfiled_args, clean=True)
731 self.mapperd = self.get_mapperd(self.mapperd_args)
732 self.mapperdport = self.mapperd.portno_start
733 start_peer(self.blockerm)
734 start_peer(self.blockerb)
735 start_peer(self.mapperd)
736 except Exception as e:
738 stop_peer(self.mapperd)
739 stop_peer(self.blockerb)
740 stop_peer(self.blockerm)
741 super(MapperdTest, self).tearDown()
745 stop_peer(self.mapperd)
746 stop_peer(self.blockerb)
747 stop_peer(self.blockerm)
748 super(MapperdTest, self).tearDown()
750 def test_create(self):
752 volsize = 10*1024*1024
753 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
754 clone_size=0, expected=False)
755 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
757 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
758 clone_size=volsize, expected=False)
760 def test_delete(self):
762 volsize = 10*1024*1024
767 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
768 clone_size=0, expected=False)
769 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
771 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
772 clone_size=volsize, expected=False)
773 self.send_and_evaluate_delete(self.mapperdport, volume)
774 self.send_and_evaluate_delete(self.mapperdport, volume, expected=False)
775 self.send_and_evaluate_map_write(self.mapperdport, volume,
776 offset=offset, size=size, expected=False)
777 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
780 ret = self.get_copy_map_reply(volume, offset, size, epoch)
782 self.send_and_evaluate_map_write(self.mapperdport, volume,
783 expected_data=ret, offset=offset, size=size)
785 def test_clone_snapshot(self):
788 snap2 = "mysnapshot2"
789 snap2 = "mysnapshot3"
792 volsize = 100*1024*1024*1024
793 clone2size = 200*1024*1024*1024
794 offset = 90*1024*1024*1024
800 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
802 self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap)
803 self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap,
805 xinfo = self.get_reply_info(volsize)
806 self.send_and_evaluate_info(self.mapperdport, snap, expected_data=xinfo)
807 ret = self.get_zero_map_reply(offset, size)
808 self.send_and_evaluate_map_read(self.mapperdport, volume,
809 expected_data=ret, offset=offset, size=size)
810 self.send_and_evaluate_map_read(self.mapperdport, snap,
811 expected_data=ret, offset=offset, size=size)
813 ret = self.get_copy_map_reply(volume, offset, size, epoch)
814 self.send_and_evaluate_map_write(self.mapperdport, volume,
815 expected_data=ret, offset=offset, size=size)
817 stop_peer(self.mapperd)
818 start_peer(self.mapperd)
819 self.send_and_evaluate_map_read(self.mapperdport, volume,
820 expected_data=ret, offset=offset, size=size)
822 self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone1)
823 xinfo = self.get_reply_info(volsize)
824 self.send_and_evaluate_info(self.mapperdport, clone1, expected_data=xinfo)
825 self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
826 clone_size=2, expected=False)
827 self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
828 clone_size=clone2size)
829 xinfo = self.get_reply_info(clone2size)
830 self.send_and_evaluate_info(self.mapperdport, clone2, expected_data=xinfo)
834 volsize = 10*1024*1024
835 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
837 xinfo = self.get_reply_info(volsize)
838 self.send_and_evaluate_info(self.mapperdport, volume, expected_data=xinfo)
840 def test_info2(self):
842 volsize = 10*1024*1024
843 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
845 xinfo = self.get_reply_info(volsize)
847 reqs.add(self.send_info(self.mapperdport, volume))
848 reqs.add(self.send_info(self.mapperdport, volume))
850 req = self.xseg.wait_requests(reqs)
851 self.evaluate_req(req, data=xinfo)
856 volsize = 10*1024*1024
857 self.send_and_evaluate_open(self.mapperdport, volume, expected=False)
858 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
860 self.send_and_evaluate_open(self.mapperdport, volume)
861 self.send_and_evaluate_open(self.mapperdport, volume)
863 def test_open2(self):
865 volsize = 10*1024*1024
866 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
869 reqs.add(self.send_open(self.mapperdport, volume))
870 reqs.add(self.send_open(self.mapperdport, volume))
872 req = self.xseg.wait_requests(reqs)
873 self.evaluate_req(req)
876 def test_close(self):
878 volsize = 10*1024*1024
879 self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
880 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
882 self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
883 self.send_and_evaluate_open(self.mapperdport, volume)
884 self.send_and_evaluate_close(self.mapperdport, volume)
885 self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
889 volsize = 10*1024*1024
892 ret = MapperdTest.get_zero_map_reply(offset, size)
893 self.send_and_evaluate_map_read(self.mapperdport, volume,
894 offset=offset, size=size, expected=False)
895 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
897 self.send_and_evaluate_map_read(self.mapperdport, volume,
898 expected_data=ret, offset=offset, size=size)
900 self.send_and_evaluate_map_read(self.mapperdport, volume,
901 offset=offset, size=size, expected=False)
903 self.send_and_evaluate_map_read(self.mapperdport, volume,
904 offset=offset, size=size, expected=False)
906 def test_mapr2(self):
908 volsize = 10*1024*1024
912 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
915 reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
917 reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
919 ret = MapperdTest.get_zero_map_reply(offset, size)
921 req = self.xseg.wait_requests(reqs)
922 self.evaluate_req(req, data=ret)
926 blocksize = self.blocksize
928 volsize = 100*1024*1024*1024
929 offset = 90*1024*1024*1024 - 2
933 ret = self.get_copy_map_reply(volume, offset, size, epoch)
935 self.send_and_evaluate_map_write(self.mapperdport, volume,
936 offset=offset, size=size, expected=False)
937 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
939 self.send_and_evaluate_map_write(self.mapperdport, volume,
940 expected_data=ret, offset=offset, size=size)
941 self.send_and_evaluate_map_read(self.mapperdport, volume,
942 expected_data = ret, offset=offset, size=size)
943 stop_peer(self.mapperd)
944 start_peer(self.mapperd)
945 self.send_and_evaluate_map_read(self.mapperdport, volume,
946 expected_data=ret, offset=offset, size=size)
947 self.send_and_evaluate_open(self.mapperdport, volume)
948 offset = 101*1024*1024*1024
949 self.send_and_evaluate_map_write(self.mapperdport, volume,
950 offset=offset, size=size, expected=False)
951 offset = 100*1024*1024*1024 - 1
952 self.send_and_evaluate_map_write(self.mapperdport, volume,
953 offset=offset, size=size, expected=False)
955 def test_mapw2(self):
956 blocksize = self.blocksize
958 volsize = 100*1024*1024*1024
959 offset = 90*1024*1024*1024 - 2
963 ret = self.get_copy_map_reply(volume, offset, size, epoch)
965 self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
969 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
971 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
973 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
975 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
977 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
979 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
981 reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
984 req = self.xseg.wait_requests(reqs)
985 self.evaluate_req(req, data=ret)
988 class BlockerTest(object):
989 def test_write_read(self):
991 data = get_random_string(datalen, 16)
993 xinfo = self.get_reply_info(datalen)
995 self.send_and_evaluate_write(self.blockerport, target, data=data,
997 self.send_and_evaluate_read(self.blockerport, target, size=datalen,
999 self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1000 stop_peer(self.blocker)
1001 start_peer(self.blocker)
1002 self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1004 self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1006 def test_info(self):
1008 data = get_random_string(datalen, 16)
1010 self.send_and_evaluate_write(self.blockerport, target, data=data,
1012 xinfo = self.get_reply_info(datalen)
1013 self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1015 def test_copy(self):
1017 data = get_random_string(datalen, 16)
1019 copy_target = "copy_target"
1021 self.send_and_evaluate_write(self.blockerport, target, data=data,
1023 self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1024 expected_data=data, serviced=datalen)
1025 self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
1026 size=datalen, serviced=datalen)
1027 self.send_and_evaluate_read(self.blockerport, copy_target, size=datalen,
1031 def test_delete(self):
1033 data = get_random_string(datalen, 16)
1035 self.send_and_evaluate_delete(self.blockerport, target, False)
1036 self.send_and_evaluate_write(self.blockerport, target, data=data)
1037 self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1039 self.send_and_evaluate_delete(self.blockerport, target, True)
1040 data = '\x00' * datalen
1041 self.send_and_evaluate_read(self.blockerport, target, size=datalen, expected_data=data)
1043 def test_hash(self):
1045 data = '\x00'*datalen
1046 target = "target_zeros"
1049 self.send_and_evaluate_write(self.blockerport, target, data=data,
1051 ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1052 self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1053 expected_data=ret, serviced=datalen)
1056 data = get_random_string(datalen, 16)
1057 self.send_and_evaluate_write(self.blockerport, target, data=data,
1059 ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1060 self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1061 expected_data=ret, serviced=datalen)
1062 self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1063 expected_data=ret, serviced=datalen)
1064 self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1065 expected_data=ret, serviced=datalen)
1067 def test_locking(self):
1069 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1070 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1071 self.send_and_evaluate_release(self.blockerport, target, expected=True)
1072 self.send_and_evaluate_release(self.blockerport, target, expected=False)
1073 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1074 stop_peer(self.blocker)
1075 start_peer(self.blocker)
1076 self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1077 self.send_and_evaluate_release(self.blockerport, target, expected=False)
1078 self.send_and_evaluate_release(self.blockerport, target, force=True,
1082 class FiledTest(BlockerTest, XsegTest):
1084 'role': 'testfiled',
1085 'spec': XsegTest.spec,
1087 'archip_dir': '/tmp/filedtest/',
1088 'prefix': 'archip_',
1096 super(FiledTest, self).setUp()
1098 self.blocker = self.get_filed(self.filed_args, clean=True)
1099 self.blockerport = self.blocker.portno_start
1100 start_peer(self.blocker)
1101 except Exception as e:
1102 super(FiledTest, self).tearDown()
1106 stop_peer(self.blocker)
1107 super(FiledTest, self).tearDown()
1109 def test_locking(self):
1111 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1112 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1113 self.send_and_evaluate_release(self.blockerport, target, expected=True)
1114 self.send_and_evaluate_release(self.blockerport, target, expected=False)
1115 self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1116 stop_peer(self.blocker)
1117 new_filed_args = copy(self.filed_args)
1118 new_filed_args['unique_str'] = 'ThisisSparta'
1119 self.blocker = Filed(**new_filed_args)
1120 start_peer(self.blocker)
1121 self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1122 self.send_and_evaluate_release(self.blockerport, target, expected=False)
1123 self.send_and_evaluate_release(self.blockerport, target, force=True,
1126 class SosdTest(BlockerTest, XsegTest):
1129 'spec': XsegTest.spec,
1135 'pool': 'test_sosd',
1140 super(SosdTest, self).setUp()
1142 self.blocker = self.get_sosd(self.filed_args, clean=True)
1143 self.blockerport = self.blocker.portno_start
1144 start_peer(self.blocker)
1145 except Exception as e:
1146 super(SosdTest, self).tearDown()
1150 stop_peer(self.blocker)
1151 super(SosdTest, self).tearDown()
1153 if __name__=='__main__':