e3a9a39c91520c04184049da269dec566ef09689
[archipelago] / xseg / tools / qa / tests.py
1 # Copyright 2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import archipelago
35 from archipelago.common import Xseg_ctx, Request, Filed, Mapperd, Vlmcd, Sosd, \
36         Error, Segment
37 from archipelago.archipelago import start_peer, stop_peer
38 import random as rnd
39 import unittest2 as unittest
40 from xseg.xprotocol import *
41 from xseg.xseg_api import *
42 import ctypes
43 import os
44 from copy import copy
45 from sets import Set
46 from binascii import hexlify, unhexlify
47 from hashlib import sha256
48
49 def get_random_string(length=64, repeat=16):
50     nr_repeats = length//repeat
51
52     l = []
53     for i in range(repeat):
54         l.append(chr(ord('a') + rnd.randint(0,25)))
55     random_string = ''.join(l)
56
57     l = []
58     for i in range(nr_repeats):
59         l.append(random_string)
60     rem = length % repeat
61     l.append(random_string[0:rem])
62
63     return ''.join(l)
64
65 def recursive_remove(path):
66     for root, dirs, files in os.walk(path, topdown=False):
67         for name in files:
68             os.remove(os.path.join(root, name))
69         for name in dirs:
70             os.rmdir(os.path.join(root, name))
71
72 def merkle_hash(hashes):
73     if len(hashes) == 0:
74         return sha256('').digest()
75     if len(hashes) == 1:
76         return hashes[0]
77
78     s = 2
79     while s < len(hashes):
80         s = s * 2
81     hashes += [('\x00' * len(hashes[0]))] * (s - len(hashes))
82     while len(hashes) > 1 :
83         hashes = [sha256(hashes[i] + hashes[i + 1]).digest() for i in range (0, len(hashes), 2)]
84     return hashes[0]
85     
86
87 def init():
88     rnd.seed()
89 #    archipelago.common.BIN_DIR=os.path.join(os.getcwd(), '../../peers/user/')
90     archipelago.common.LOGS_PATH=os.path.join(os.getcwd(), 'logs')
91     archipelago.common.PIDFILE_PATH=os.path.join(os.getcwd(), 'pids')
92     if not os.path.isdir(archipelago.common.LOGS_PATH):
93         os.makedirs(archipelago.common.LOGS_PATH)
94     if not os.path.isdir(archipelago.common.PIDFILE_PATH):
95         os.makedirs(archipelago.common.PIDFILE_PATH)
96
97     recursive_remove(archipelago.common.LOGS_PATH)
98
99 class XsegTest(unittest.TestCase):
100     myport = 15
101     spec = "posix:testsegment:16:256:12".encode()
102     blocksize = 4*1024*1024
103     segment = None
104
105     def setUp(self):
106         self.segment = Segment('posix', 'testsegment', 16, 256, 12)
107         try:
108             self.segment.create()
109         except Exception as e:
110             self.segment.destroy()
111             self.segment.create()
112         self.xseg = Xseg_ctx(self.segment, self.myport)
113
114     def tearDown(self):
115         if self.xseg:
116             self.xseg.shutdown()
117         if self.segment:
118             self.segment.destroy()
119
120     @staticmethod
121     def get_reply_info(size):
122         xinfo = xseg_reply_info()
123         xinfo.size = size
124         return xinfo
125
126     @staticmethod
127     def get_hash_reply(hashstring):
128         xhash = xseg_reply_hash()
129         xhash.target = hashstring
130         xhash.targetlen = len(hashstring)
131         return xhash
132
133     @staticmethod
134     def get_object_name(volume, epoch, index):
135         epoch_64 = ctypes.c_uint64(epoch)
136         index_64 = ctypes.c_uint64(index)
137         epoch_64_char = ctypes.cast(ctypes.addressof(epoch_64), ctypes.c_char_p)
138         index_64_char = ctypes.cast(ctypes.addressof(index_64), ctypes.c_char_p)
139         epoch_64_str = ctypes.string_at(epoch_64_char, ctypes.sizeof(ctypes.c_uint64))
140         index_64_str = ctypes.string_at(index_64_char, ctypes.sizeof(ctypes.c_uint64))
141         epoch_hex = hexlify(epoch_64_str)
142         index_hex = hexlify(index_64_str)
143         return "archip_" + volume + "_" + epoch_hex + "_" + index_hex
144
145     @staticmethod
146     def get_map_reply(offset, size):
147         blocksize = XsegTest.blocksize
148         ret = xseg_reply_map()
149         cnt = (offset+size)//blocksize - offset//blocksize
150         if (offset+size) % blocksize > 0 :
151             cnt += 1
152         ret.cnt = cnt
153         SegsArray = xseg_reply_map_scatterlist * cnt
154         segs = SegsArray()
155         rem_size = size
156         offset = offset % blocksize
157         for i in range(0, cnt):
158             segs[i].offset = offset
159             segs[i].size = blocksize - offset
160             if segs[i].size > rem_size:
161                 segs[i].size = rem_size
162             offset = 0
163             rem_size -= segs[i].size
164             if rem_size < 0 :
165                 raise Error("Calculation error")
166         ret.segs = segs
167
168         return ret
169
170     @staticmethod
171     def get_list_of_hashes(xreply, from_segment=False):
172         hashes = []
173         cnt = xreply.cnt
174         segs = xreply.segs
175         if from_segment:
176             SegsArray = xseg_reply_map_scatterlist * cnt
177             array = SegsArray.from_address(ctypes.addressof(segs))
178             segs = array
179         for i in range(0, cnt):
180             hashes.append(ctypes.string_at(segs[i].target, segs[i].targetlen))
181         return hashes
182
183     @staticmethod
184     def get_zero_map_reply(offset, size):
185         ret = XsegTest.get_map_reply(offset, size);
186         cnt = ret.cnt
187         for i in range(0, cnt):
188             ret.segs[i].target = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
189             ret.segs[i].targetlen = len(ret.segs[i].target)
190         return ret
191
192     @staticmethod
193     def get_copy_map_reply(volume, offset, size, epoch):
194         blocksize = XsegTest.blocksize
195         objidx_start = offset//blocksize
196         ret = XsegTest.get_map_reply(offset, size);
197         cnt = ret.cnt
198         for i in range(0, cnt):
199             ret.segs[i].target = MapperdTest.get_object_name(volume, epoch,
200                     objidx_start+i)
201             ret.segs[i].targetlen = len(ret.segs[i].target)
202         return ret
203
204     def get_req(self, op, dst, target, data=None, size=0, offset=0, datalen=0,
205             flags=0):
206         return Request(self.xseg, dst, target, data=data, size=size,
207                 offset=offset, datalen=datalen, flags=flags, op=op)
208
209     def assert_equal_xseg(self, req, expected_data):
210         if isinstance(expected_data, xseg_reply_info):
211             datasize = ctypes.sizeof(expected_data)
212             self.assertEqual(datasize, req.get_datalen())
213             data = req.get_data(type(expected_data)).contents
214             self.assertEqual(data.size, expected_data.size)
215         elif isinstance(expected_data, xseg_reply_map):
216             #since xseg_reply_map uses a flexible array for the
217             #xseg_reply_map_scatterlist reply, we calculate the size of the
218             #reply in the segment, by subtracting the size of the pointer to
219             #the array, in the python object
220             datasize = ctypes.sizeof(expected_data)
221             datasize -= ctypes.sizeof(expected_data.segs)
222             datasize += expected_data.cnt*ctypes.sizeof(xseg_reply_map_scatterlist)
223             self.assertEqual(datasize, req.get_datalen())
224             data = req.get_data(type(expected_data)).contents
225             cnt = data.cnt
226             self.assertEqual(data.cnt, expected_data.cnt)
227             segs = data.segs
228             SegsArray = xseg_reply_map_scatterlist * cnt
229             array = SegsArray.from_address(ctypes.addressof(segs))
230             expected_array = expected_data.segs
231             for i in range(0, cnt):
232                 t = ctypes.string_at(array[i].target, array[i].targetlen)
233                 self.assertEqual(array[i].targetlen, expected_array[i].targetlen)
234                 self.assertEqual(t, expected_array[i].target)
235                 self.assertEqual(array[i].offset, expected_array[i].offset)
236                 self.assertEqual(array[i].size, expected_array[i].size)
237         elif isinstance(expected_data, xseg_reply_hash):
238             datasize = ctypes.sizeof(expected_data)
239             self.assertEqual(datasize, req.get_datalen())
240             data = req.get_data(type(expected_data)).contents
241             self.assertEqual(data.targetlen, expected_data.targetlen)
242             t = ctypes.string_at(data.target, data.targetlen)
243             et = ctypes.string_at(expected_data.target, expected_data.targetlen)
244             self.assertEqual(t, et)
245         else:
246             raise Error("Unknown data type")
247
248     def evaluate_req(self, req, success=True, serviced=None, data=None):
249         if not success:
250             self.assertFalse(req.success())
251             return
252
253         self.assertTrue(req.success())
254         if serviced is not None:
255             self.assertEqual(req.get_serviced(), serviced)
256         if data is not None:
257             if isinstance(data, basestring):
258                 datalen = len(data)
259                 self.assertEqual(datalen, req.get_datalen())
260                 self.assertEqual(data, ctypes.string_at(req.get_data(None), datalen))
261             else:
262                 self.assert_equal_xseg(req, data)
263
264     def evaluate(send_func):
265         def send_and_evaluate(self, dst, target, expected=True, serviced=None,
266                 expected_data=None, **kwargs):
267             req = send_func(self, dst, target, **kwargs)
268             req.wait()
269             self.evaluate_req(req, success=expected, serviced=serviced,
270                     data=expected_data)
271             self.assertTrue(req.put())
272         return send_and_evaluate
273
274     def send_write(self, dst, target, data=None, offset=0, datalen=0):
275         #assert datalen >= size
276 #        req = self.get_req(X_WRITE, dst, target, data, size=size, offset=offset, datalen=datalen)
277         req = Request.get_write_request(self.xseg, dst, target, data=data,
278                 offset=offset, datalen=datalen)
279         req.submit()
280         return req
281
282     send_and_evaluate_write = evaluate(send_write)
283
284     def send_read(self, dst, target, size=0, datalen=0, offset=0):
285         #assert datalen >= size
286 #        req = self.get_req(X_READ, dst, target, data=None, size=size, offset=offset, datalen=datalen)
287         req = Request.get_read_request(self.xseg, dst, target, size=size,
288                 offset=offset, datalen=datalen)
289         req.submit()
290         return req
291
292     send_and_evaluate_read = evaluate(send_read)
293
294     def send_info(self, dst, target):
295         #req = self.get_req(X_INFO, dst, target, data=None, size=0)
296         req = Request.get_info_request(self.xseg, dst, target)
297         req.submit()
298         return req
299
300     send_and_evaluate_info = evaluate(send_info)
301
302     def send_copy(self, dst, src_target, dst_target=None, size=0, offset=0):
303         #datalen = ctypes.sizeof(xseg_request_copy)
304         #xcopy = xseg_request_copy()
305         #xcopy.target = src_target
306         #xcopy.targetlen = len(src_target)
307 #        req = self.get_req(X_COPY, dst, dst_target, data=xcopy, datalen=datalen,
308 #                offset=offset, size=size)
309         req = Request.get_copy_request(self.xseg, dst, src_target,
310                 copy_target=dst_target, size=size, offset=offset)
311         req.submit()
312         return req
313
314     send_and_evaluate_copy = evaluate(send_copy)
315
316     def send_acquire(self, dst, target):
317         #req = self.get_req(X_ACQUIRE, dst, target, flags=XF_NOSYNC)
318         req = Request.get_acquire_request(self.xseg, dst, target)
319         req.submit()
320         return req
321
322     send_and_evaluate_acquire = evaluate(send_acquire)
323
324     def send_release(self, dst, target, force=False):
325         #req_flags = XF_NOSYNC
326         #if force:
327             #req_flags |= XF_FORCE
328         #req = self.get_req(X_RELEASE, dst, target, size=0, flags=req_flags)
329         req = Request.get_release_request(self.xseg, dst, target, force)
330         req.submit()
331         return req
332
333     send_and_evaluate_release = evaluate(send_release)
334
335     def send_delete(self, dst, target):
336         #req = self.get_req(X_DELETE, dst, target)
337         req = Request.get_delete_request(self.xseg, dst, target)
338         req.submit()
339         return req
340
341     send_and_evaluate_delete = evaluate(send_delete)
342
343     def send_clone(self, dst, src_target, clone=None, clone_size=0,
344             cont_addr=False):
345         #xclone = xseg_request_clone()
346         #xclone.target = src_target
347         #xclone.targetlen = len(src_target)
348         #xclone.size = clone_size
349
350         #req = self.get_req(X_CLONE, dst, clone, data=xclone,
351                 #datalen=ctypes.sizeof(xclone))
352         req = Request.get_clone_request(self.xseg, dst, src_target,
353                 clone=clone, clone_size=clone_size, cont_addr=cont_addr)
354         req.submit()
355         return req
356
357     send_and_evaluate_clone = evaluate(send_clone)
358
359     def send_snapshot(self, dst, src_target, snap=None):
360         #xsnapshot = xseg_request_snapshot()
361         #xsnapshot.target = snap
362         #xsnapshot.targetlen = len(snap)
363
364         #req = self.get_req(X_SNAPSHOT, dst, src_target, data=xsnapshot,
365                 #datalen=ctypes.sizeof(xsnapshot))
366         req = Request.get_snapshot_request(self.xseg, dst, src_target, snap=snap)
367         req.submit()
368         return req
369
370     send_and_evaluate_snapshot = evaluate(send_snapshot)
371
372     def send_open(self, dst, target):
373         #req = self.get_req(X_OPEN, dst, target)
374         req = Request.get_open_request(self.xseg, dst, target)
375         req.submit()
376         return req
377
378     send_and_evaluate_open = evaluate(send_open)
379
380     def send_close(self, dst, target):
381         #req = self.get_req(X_CLOSE, dst, target)
382         req = Request.get_close_request(self.xseg, dst, target)
383         req.submit()
384         return req
385
386     send_and_evaluate_close = evaluate(send_close)
387
388     def send_map_read(self, dst, target, offset=0, size=0):
389         #req = self.get_req(X_MAPR, dst, target, size=size, offset=offset,
390                 #datalen=0)
391         req = Request.get_mapr_request(self.xseg, dst, target, offset=offset,
392                 size=size)
393         req.submit()
394         return req
395
396     send_and_evaluate_map_read = evaluate(send_map_read)
397
398     def send_map_write(self, dst, target, offset=0, size=0):
399         #req = self.get_req(X_MAPW, dst, target, size=size, offset=offset,
400                 #datalen=0)
401         req = Request.get_mapw_request(self.xseg, dst, target, offset=offset,
402                 size=size)
403         req.submit()
404         return req
405
406     send_and_evaluate_map_write = evaluate(send_map_write)
407
408     def send_hash(self, dst, target, size=0):
409         #req = self.get_req(X_hash, dst, target, data=None, size=0)
410         req = Request.get_hash_request(self.xseg, dst, target, size=size)
411         req.submit()
412         return req
413
414     send_and_evaluate_hash = evaluate(send_hash)
415
416     def get_filed(self, args, clean=False):
417         path = args['archip_dir']
418         if not os.path.exists(path):
419             os.makedirs(path)
420
421         if clean:
422             recursive_remove(path)
423
424         return Filed(**args)
425
426     def get_sosd(self, args, clean=False):
427         pool = args['pool']
428         import rados
429         cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
430         cluster.connect()
431         if cluster.pool_exists(pool):
432             cluster.delete_pool(pool)
433         cluster.create_pool(pool)
434
435         cluster.shutdown()
436         return Sosd(**args)
437
438     def get_mapperd(self, args):
439         return Mapperd(**args)
440
441     def get_vlmcd(self, args):
442         return Vlmcd(**args)
443
444 class VlmcdTest(XsegTest):
445     bfiled_args = {
446             'role': 'vlmctest-blockerb',
447             'spec': XsegTest.spec,
448             'nr_ops': 16,
449             'archip_dir': '/tmp/bfiledtest/',
450             'prefix': 'archip_',
451             'portno_start': 0,
452             'portno_end': 0,
453             'daemon': True,
454             'log_level': 3,
455             }
456     mfiled_args = {
457             'role': 'vlmctest-blockerm',
458             'spec': XsegTest.spec,
459             'nr_ops': 16,
460             'archip_dir': '/tmp/mfiledtest/',
461             'prefix': 'archip_',
462             'portno_start': 1,
463             'portno_end': 1,
464             'daemon': True,
465             'log_level': 3,
466             }
467     mapperd_args = {
468             'role': 'vlmctest-mapper',
469             'spec': XsegTest.spec,
470             'nr_ops': 16,
471             'portno_start': 2,
472             'portno_end': 2,
473             'daemon': True,
474             'log_level': 3,
475             'blockerb_port': 0,
476             'blockerm_port': 1,
477             }
478     vlmcd_args = {
479             'role': 'vlmctest-vlmc',
480             'spec': XsegTest.spec,
481             'nr_ops': 16,
482             'portno_start': 3,
483             'portno_end': 3,
484             'daemon': True,
485             'log_level': 3,
486             'blocker_port': 0,
487             'mapper_port': 2
488             }
489
490     def setUp(self):
491         super(VlmcdTest, self).setUp()
492         try:
493             self.blockerm = self.get_filed(self.mfiled_args, clean=True)
494             self.blockerb = self.get_filed(self.bfiled_args, clean=True)
495             self.mapperd = self.get_mapperd(self.mapperd_args)
496             self.vlmcd = self.get_vlmcd(self.vlmcd_args)
497             self.vlmcdport = self.vlmcd.portno_start
498             self.mapperdport = self.mapperd.portno_start
499             self.blockerbport = self.blockerb.portno_start
500             start_peer(self.blockerm)
501             start_peer(self.blockerb)
502             start_peer(self.mapperd)
503             start_peer(self.vlmcd)
504         except Exception as e:
505             print e
506             stop_peer(self.vlmcd)
507             stop_peer(self.mapperd)
508             stop_peer(self.blockerb)
509             stop_peer(self.blockerm)
510             super(VlmcdTest, self).tearDown()
511             raise e
512
513     def tearDown(self):
514         stop_peer(self.vlmcd)
515         stop_peer(self.mapperd)
516         stop_peer(self.blockerb)
517         stop_peer(self.blockerm)
518         super(VlmcdTest, self).tearDown()
519
520     def test_open(self):
521         volume = "myvolume"
522         volsize = 10*1024*1024
523         self.send_and_evaluate_open(self.vlmcdport, volume, expected=False)
524         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
525                 clone_size=volsize)
526         self.send_and_evaluate_open(self.vlmcdport, volume)
527         self.send_and_evaluate_open(self.vlmcdport, volume)
528
529     def test_close(self):
530         volume = "myvolume"
531         volsize = 10*1024*1024
532         self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
533         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
534                 clone_size=volsize)
535         self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
536         self.send_and_evaluate_open(self.vlmcdport, volume)
537         self.send_and_evaluate_close(self.vlmcdport, volume)
538         self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
539
540     def test_info(self):
541         volume = "myvolume"
542         volsize = 10*1024*1024
543         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
544                 clone_size=volsize)
545         xinfo = self.get_reply_info(volsize)
546         self.send_and_evaluate_info(self.vlmcdport, volume, expected_data=xinfo)
547
548     def test_write_read(self):
549         datalen = 1024
550         data = get_random_string(datalen, 16)
551         volume = "myvolume"
552         volsize = 10*1024*1024
553
554         self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
555                 expected=False)
556         self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
557                 expected=False)
558         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
559                 clone_size=volsize)
560         self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
561                 serviced=datalen)
562         self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
563                 expected_data=data)
564
565     def test_clone_snapshot(self):
566         volume = "myvolume"
567         snap = "mysnapshot"
568         snap2 = "mysnapshot2"
569         snap2 = "mysnapshot3"
570         clone1 = "myclone1"
571         clone2 = "myclone2"
572
573         volsize = 100*1024*1024*1024
574         clone2size = 200*1024*1024*1024
575         offset = 90*1024*1024*1024
576         size = 10*1024*1024
577
578         zeros = '\x00' * size
579         data = get_random_string(size, 16)
580         data2 = get_random_string(size, 16)
581
582         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
583                 clone_size=volsize)
584         self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
585                 offset=offset, expected_data=zeros)
586
587         self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
588         self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
589                 offset=offset, expected_data=zeros)
590         self.send_and_evaluate_write(self.vlmcdport, volume, data=data, offset=offset,
591                 serviced=size)
592         self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
593                 offset=offset, expected_data=zeros)
594         self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
595                 offset=offset, expected_data=data)
596
597         self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap2)
598         self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
599                 offset=offset, expected_data=data)
600         self.send_and_evaluate_clone(self.mapperdport, snap2, clone=clone1,
601                 clone_size=clone2size)
602         self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
603                 offset=offset, expected_data=data)
604         self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
605                 offset=volsize+offset, expected_data=zeros)
606
607         self.send_and_evaluate_write(self.vlmcdport, clone1, data=data2,
608                                 offset=offset, serviced=size)
609         self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
610                 offset=offset, expected_data=data2)
611         self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
612                 offset=offset, expected_data=data)
613         self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
614                 offset=offset, expected_data=data)
615         self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
616                 offset=offset, expected_data=zeros)
617
618     def test_info2(self):
619         volume = "myvolume"
620         volsize = 10*1024*1024
621         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
622                 clone_size=volsize)
623         xinfo = self.get_reply_info(volsize)
624         reqs = Set([])
625         reqs.add(self.send_info(self.vlmcdport, volume))
626         reqs.add(self.send_info(self.vlmcdport, volume))
627         while len(reqs) > 0:
628             req = self.xseg.wait_requests(reqs)
629             self.evaluate_req(req, data=xinfo)
630             reqs.remove(req)
631
632     def test_hash(self):
633         blocksize = self.blocksize
634         volume = "myvolume"
635         volume2 = "myvolume2"
636         snap = "snapshot"
637         clone = "clone"
638         volsize = 10*1024*1024
639         size = 512*1024
640         epoch = 1
641         offset = 0
642         data = get_random_string(size, 16)
643
644         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
645                 clone_size=volsize)
646         self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
647                                 offset=offset, serviced=size)
648
649         self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
650
651         self.send_and_evaluate_hash(self.mapperdport, volume, size=volsize,
652                 expected=False)
653         req = self.send_hash(self.mapperdport, snap, size=volsize)
654         req.wait()
655         self.assertTrue(req.success())
656         xreply = req.get_data(xseg_reply_hash).contents
657         hash_map = ctypes.string_at(xreply.target, xreply.targetlen)
658         req.put()
659
660         req = self.send_map_read(self.mapperdport, snap, offset=0,
661                 size=volsize)
662         req.wait()
663         self.assertTrue(req.success())
664         xreply = req.get_data(xseg_reply_map).contents
665         blocks = self.get_list_of_hashes(xreply, from_segment=True)
666         req.put()
667         h = []
668         for b in blocks:
669             if (b == sha256('').hexdigest()):
670                 h.append(unhexlify(b))
671                 continue
672             req = self.send_hash(self.blockerbport, b, size=blocksize)
673             req.wait()
674             self.assertTrue(req.success())
675             xreply = req.get_data(xseg_reply_hash).contents
676             h.append(unhexlify(ctypes.string_at(xreply.target, xreply.targetlen)))
677             req.put()
678
679         mh = hexlify(merkle_hash(h))
680         self.assertEqual(hash_map, mh)
681
682         self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
683                 clone_size=volsize * 2, expected=False)
684         self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
685                 clone_size=volsize * 2, cont_addr=True)
686         self.send_and_evaluate_read(self.vlmcdport, volume2, size=size,
687                 offset=offset, expected_data=data)
688         self.send_and_evaluate_read(self.vlmcdport, volume2, size=volsize - size,
689                 offset=offset + size, expected_data='\x00' * (volsize - size))
690
691
692 class MapperdTest(XsegTest):
693     bfiled_args = {
694             'role': 'mappertest-blockerb',
695             'spec': XsegTest.spec,
696             'nr_ops': 16,
697             'archip_dir': '/tmp/bfiledtest/',
698             'prefix': 'archip_',
699             'portno_start': 0,
700             'portno_end': 0,
701             'daemon': True,
702             'log_level': 3,
703             }
704     mfiled_args = {
705             'role': 'mappertest-blockerm',
706             'spec': XsegTest.spec,
707             'nr_ops': 16,
708             'archip_dir': '/tmp/mfiledtest/',
709             'prefix': 'archip_',
710             'portno_start': 1,
711             'portno_end': 1,
712             'daemon': True,
713             'log_level': 3,
714             }
715     mapperd_args = {
716             'role': 'mappertest-mapper',
717             'spec': XsegTest.spec,
718             'nr_ops': 16,
719             'portno_start': 2,
720             'portno_end': 2,
721             'daemon': True,
722             'log_level': 3,
723             'blockerb_port': 0,
724             'blockerm_port': 1,
725             }
726     blocksize = 4*1024*1024
727
728     def setUp(self):
729         super(MapperdTest, self).setUp()
730         try:
731             self.blockerm = self.get_filed(self.mfiled_args, clean=True)
732             self.blockerb = self.get_filed(self.bfiled_args, clean=True)
733             self.mapperd = self.get_mapperd(self.mapperd_args)
734             self.mapperdport = self.mapperd.portno_start
735             start_peer(self.blockerm)
736             start_peer(self.blockerb)
737             start_peer(self.mapperd)
738         except Exception as e:
739             print e
740             stop_peer(self.mapperd)
741             stop_peer(self.blockerb)
742             stop_peer(self.blockerm)
743             super(MapperdTest, self).tearDown()
744             raise e
745
746     def tearDown(self):
747         stop_peer(self.mapperd)
748         stop_peer(self.blockerb)
749         stop_peer(self.blockerm)
750         super(MapperdTest, self).tearDown()
751
752     def test_create(self):
753         volume = "myvolume"
754         volsize = 10*1024*1024
755         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
756                 clone_size=0, expected=False)
757         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
758                 clone_size=volsize)
759         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
760                 clone_size=volsize, expected=False)
761
762     def test_delete(self):
763         volume = "myvolume"
764         volsize = 10*1024*1024
765         offset = 0
766         size = 10
767         epoch = 2
768
769         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
770                 clone_size=0, expected=False)
771         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
772                 clone_size=volsize)
773         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
774                 clone_size=volsize, expected=False)
775         self.send_and_evaluate_delete(self.mapperdport, volume)
776         self.send_and_evaluate_delete(self.mapperdport, volume, expected=False)
777         self.send_and_evaluate_map_write(self.mapperdport, volume,
778                 offset=offset, size=size, expected=False)
779         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
780                 clone_size=volsize)
781
782         ret = self.get_copy_map_reply(volume, offset, size, epoch)
783
784         self.send_and_evaluate_map_write(self.mapperdport, volume,
785                 expected_data=ret, offset=offset, size=size)
786
787     def test_clone_snapshot(self):
788         volume = "myvolume"
789         snap = "mysnapshot"
790         snap2 = "mysnapshot2"
791         snap2 = "mysnapshot3"
792         clone1 = "myclone1"
793         clone2 = "myclone2"
794         volsize = 100*1024*1024*1024
795         clone2size = 200*1024*1024*1024
796         offset = 90*1024*1024*1024
797         size = 10*1024*1024
798         offset = 0
799         size = volsize
800         epoch = 2
801
802         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
803                 clone_size=volsize)
804         self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap)
805         self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap,
806                 expected=False)
807         xinfo = self.get_reply_info(volsize)
808         self.send_and_evaluate_info(self.mapperdport, snap, expected_data=xinfo)
809         ret = self.get_zero_map_reply(offset, size)
810         self.send_and_evaluate_map_read(self.mapperdport, volume,
811                 expected_data=ret, offset=offset, size=size)
812         self.send_and_evaluate_map_read(self.mapperdport, snap,
813                 expected_data=ret, offset=offset, size=size)
814
815         ret = self.get_copy_map_reply(volume, offset, size, epoch)
816         self.send_and_evaluate_map_write(self.mapperdport, volume,
817                 expected_data=ret, offset=offset, size=size)
818
819         stop_peer(self.mapperd)
820         start_peer(self.mapperd)
821         self.send_and_evaluate_map_read(self.mapperdport, volume,
822                 expected_data=ret, offset=offset, size=size)
823
824         self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone1)
825         xinfo = self.get_reply_info(volsize)
826         self.send_and_evaluate_info(self.mapperdport, clone1, expected_data=xinfo)
827         self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
828                 clone_size=2, expected=False)
829         self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
830                 clone_size=clone2size)
831         xinfo = self.get_reply_info(clone2size)
832         self.send_and_evaluate_info(self.mapperdport, clone2, expected_data=xinfo)
833
834     def test_info(self):
835         volume = "myvolume"
836         volsize = 10*1024*1024
837         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
838                 clone_size=volsize)
839         xinfo = self.get_reply_info(volsize)
840         self.send_and_evaluate_info(self.mapperdport, volume, expected_data=xinfo)
841
842     def test_info2(self):
843         volume = "myvolume"
844         volsize = 10*1024*1024
845         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
846                 clone_size=volsize)
847         xinfo = self.get_reply_info(volsize)
848         reqs = Set([])
849         reqs.add(self.send_info(self.mapperdport, volume))
850         reqs.add(self.send_info(self.mapperdport, volume))
851         while len(reqs) > 0:
852             req = self.xseg.wait_requests(reqs)
853             self.evaluate_req(req, data=xinfo)
854             reqs.remove(req)
855
856     def test_open(self):
857         volume = "myvolume"
858         volsize = 10*1024*1024
859         self.send_and_evaluate_open(self.mapperdport, volume, expected=False)
860         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
861                 clone_size=volsize)
862         self.send_and_evaluate_open(self.mapperdport, volume)
863         self.send_and_evaluate_open(self.mapperdport, volume)
864
865     def test_open2(self):
866         volume = "myvolume"
867         volsize = 10*1024*1024
868         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
869                 clone_size=volsize)
870         reqs = Set([])
871         reqs.add(self.send_open(self.mapperdport, volume))
872         reqs.add(self.send_open(self.mapperdport, volume))
873         while len(reqs) > 0:
874             req = self.xseg.wait_requests(reqs)
875             self.evaluate_req(req)
876             reqs.remove(req)
877
878     def test_close(self):
879         volume = "myvolume"
880         volsize = 10*1024*1024
881         self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
882         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
883                 clone_size=volsize)
884         self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
885         self.send_and_evaluate_open(self.mapperdport, volume)
886         self.send_and_evaluate_close(self.mapperdport, volume)
887         self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
888
889     def test_mapr(self):
890         volume = "myvolume"
891         volsize = 10*1024*1024
892         offset = 0
893         size = volsize
894         ret = MapperdTest.get_zero_map_reply(offset, size)
895         self.send_and_evaluate_map_read(self.mapperdport, volume,
896                 offset=offset, size=size, expected=False)
897         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
898                 clone_size=volsize)
899         self.send_and_evaluate_map_read(self.mapperdport, volume,
900                 expected_data=ret, offset=offset, size=size)
901         offset = volsize - 1
902         self.send_and_evaluate_map_read(self.mapperdport, volume,
903                 offset=offset, size=size, expected=False)
904         offset = volsize + 1
905         self.send_and_evaluate_map_read(self.mapperdport, volume,
906                 offset=offset, size=size, expected=False)
907
908     def test_mapr2(self):
909         volume = "myvolume"
910         volsize = 10*1024*1024
911         offset = 0
912         size = volsize
913
914         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
915                 clone_size=volsize)
916         reqs = Set([])
917         reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
918             size=size))
919         reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
920             size=size))
921         ret = MapperdTest.get_zero_map_reply(offset, size)
922         while len(reqs) > 0:
923             req = self.xseg.wait_requests(reqs)
924             self.evaluate_req(req, data=ret)
925             reqs.remove(req)
926
927     def test_mapw(self):
928         blocksize = self.blocksize
929         volume = "myvolume"
930         volsize = 100*1024*1024*1024
931         offset = 90*1024*1024*1024 - 2
932         size = 512*1024
933         epoch = 1
934
935         ret = self.get_copy_map_reply(volume, offset, size, epoch)
936
937         self.send_and_evaluate_map_write(self.mapperdport, volume,
938                 offset=offset, size=size, expected=False)
939         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
940                 clone_size=volsize)
941         self.send_and_evaluate_map_write(self.mapperdport, volume,
942                 expected_data=ret, offset=offset, size=size)
943         self.send_and_evaluate_map_read(self.mapperdport, volume,
944                 expected_data = ret, offset=offset, size=size)
945         stop_peer(self.mapperd)
946         start_peer(self.mapperd)
947         self.send_and_evaluate_map_read(self.mapperdport, volume,
948                 expected_data=ret, offset=offset, size=size)
949         self.send_and_evaluate_open(self.mapperdport, volume)
950         offset = 101*1024*1024*1024
951         self.send_and_evaluate_map_write(self.mapperdport, volume,
952                 offset=offset, size=size, expected=False)
953         offset = 100*1024*1024*1024 - 1
954         self.send_and_evaluate_map_write(self.mapperdport, volume,
955                 offset=offset, size=size, expected=False)
956
957     def test_mapw2(self):
958         blocksize = self.blocksize
959         volume = "myvolume"
960         volsize = 100*1024*1024*1024
961         offset = 90*1024*1024*1024 - 2
962         size = 512*1024
963         epoch = 1
964
965         ret = self.get_copy_map_reply(volume, offset, size, epoch)
966
967         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
968                 clone_size=volsize)
969
970         reqs = Set([])
971         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
972             size=size))
973         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
974             size=size))
975         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
976             size=size))
977         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
978             size=size))
979         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
980             size=size))
981         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
982             size=size))
983         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
984             size=size))
985         while len(reqs) > 0:
986             req = self.xseg.wait_requests(reqs)
987             self.evaluate_req(req, data=ret)
988             reqs.remove(req)
989
990 class BlockerTest(object):
991     def test_write_read(self):
992         datalen = 1024
993         data = get_random_string(datalen, 16)
994         target = "mytarget"
995         xinfo = self.get_reply_info(datalen)
996
997         self.send_and_evaluate_write(self.blockerport, target, data=data,
998                 serviced=datalen)
999         self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1000                 expected_data=data)
1001         self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1002         stop_peer(self.blocker)
1003         start_peer(self.blocker)
1004         self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1005                 expected_data=data)
1006         self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1007
1008     def test_info(self):
1009         datalen = 1024
1010         data = get_random_string(datalen, 16)
1011         target = "mytarget"
1012         self.send_and_evaluate_write(self.blockerport, target, data=data,
1013                 serviced=datalen)
1014         xinfo = self.get_reply_info(datalen)
1015         self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1016
1017     def test_copy(self):
1018         datalen = 1024
1019         data = get_random_string(datalen, 16)
1020         target = "mytarget"
1021         copy_target = "copy_target"
1022
1023         self.send_and_evaluate_write(self.blockerport, target, data=data,
1024                 serviced=datalen)
1025         self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1026                 expected_data=data, serviced=datalen)
1027         self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
1028                 size=datalen, serviced=datalen)
1029         self.send_and_evaluate_read(self.blockerport, copy_target, size=datalen,
1030                 expected_data=data)
1031
1032
1033     def test_delete(self):
1034         datalen = 1024
1035         data = get_random_string(datalen, 16)
1036         target = "mytarget"
1037         self.send_and_evaluate_delete(self.blockerport, target, False)
1038         self.send_and_evaluate_write(self.blockerport, target, data=data)
1039         self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1040                 expected_data=data)
1041         self.send_and_evaluate_delete(self.blockerport, target, True)
1042         data = '\x00' * datalen
1043         self.send_and_evaluate_read(self.blockerport, target, size=datalen, expected_data=data)
1044
1045     def test_hash(self):
1046         datalen = 1024
1047         data = '\x00'*datalen
1048         target = "target_zeros"
1049
1050
1051         self.send_and_evaluate_write(self.blockerport, target, data=data,
1052                 serviced=datalen)
1053         ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1054         self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1055                 expected_data=ret, serviced=datalen)
1056
1057         target = "mytarget"
1058         data = get_random_string(datalen, 16)
1059         self.send_and_evaluate_write(self.blockerport, target, data=data,
1060                 serviced=datalen)
1061         ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1062         self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1063                 expected_data=ret, serviced=datalen)
1064         self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1065                 expected_data=ret, serviced=datalen)
1066         self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1067                 expected_data=ret, serviced=datalen)
1068
1069     def test_locking(self):
1070         target = "mytarget"
1071         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1072         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1073         self.send_and_evaluate_release(self.blockerport, target, expected=True)
1074         self.send_and_evaluate_release(self.blockerport, target, expected=False)
1075         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1076         stop_peer(self.blocker)
1077         start_peer(self.blocker)
1078         self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1079         self.send_and_evaluate_release(self.blockerport, target, expected=False)
1080         self.send_and_evaluate_release(self.blockerport, target, force=True,
1081                 expected=True)
1082
1083
1084 class FiledTest(BlockerTest, XsegTest):
1085     filed_args = {
1086             'role': 'testfiled',
1087             'spec': XsegTest.spec,
1088             'nr_ops': 16,
1089             'archip_dir': '/tmp/filedtest/',
1090             'prefix': 'archip_',
1091             'portno_start': 0,
1092             'portno_end': 0,
1093             'daemon': True,
1094             'log_level': 3,
1095             }
1096
1097     def setUp(self):
1098         super(FiledTest, self).setUp()
1099         try:
1100             self.blocker = self.get_filed(self.filed_args, clean=True)
1101             self.blockerport = self.blocker.portno_start
1102             start_peer(self.blocker)
1103         except Exception as e:
1104             super(FiledTest, self).tearDown()
1105             raise e
1106
1107     def tearDown(self):
1108         stop_peer(self.blocker)
1109         super(FiledTest, self).tearDown()
1110
1111     def test_locking(self):
1112         target = "mytarget"
1113         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1114         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1115         self.send_and_evaluate_release(self.blockerport, target, expected=True)
1116         self.send_and_evaluate_release(self.blockerport, target, expected=False)
1117         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1118         stop_peer(self.blocker)
1119         new_filed_args = copy(self.filed_args)
1120         new_filed_args['unique_str'] = 'ThisisSparta'
1121         self.blocker = Filed(**new_filed_args)
1122         start_peer(self.blocker)
1123         self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1124         self.send_and_evaluate_release(self.blockerport, target, expected=False)
1125         self.send_and_evaluate_release(self.blockerport, target, force=True,
1126                 expected=True)
1127
1128 class SosdTest(BlockerTest, XsegTest):
1129     filed_args = {
1130             'role': 'testsosd',
1131             'spec': XsegTest.spec,
1132             'nr_ops': 16,
1133             'portno_start': 0,
1134             'portno_end': 0,
1135             'daemon': True,
1136             'log_level': 3,
1137             'pool': 'test_sosd',
1138             'nr_threads': 3,
1139             }
1140
1141     def setUp(self):
1142         super(SosdTest, self).setUp()
1143         try:
1144             self.blocker = self.get_sosd(self.filed_args, clean=True)
1145             self.blockerport = self.blocker.portno_start
1146             start_peer(self.blocker)
1147         except Exception as e:
1148             super(SosdTest, self).tearDown()
1149             raise e
1150
1151     def tearDown(self):
1152         stop_peer(self.blocker)
1153         super(SosdTest, self).tearDown()
1154
1155 if __name__=='__main__':
1156     init()
1157     unittest.main()