vlmc: add cont_addr option in snapshot command
[archipelago] / xseg / tools / qa / tests.py
1 # Copyright 2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import archipelago
35 from archipelago.common import Xseg_ctx, Request, Filed, Mapperd, Vlmcd, Sosd, \
36         create_segment, destroy_segment, Error
37 from archipelago.archipelago import start_peer, stop_peer
38 import random as rnd
39 import unittest2 as unittest
40 from xseg.xprotocol import *
41 from xseg.xseg_api import *
42 import ctypes
43 import os
44 from copy import copy
45 from sets import Set
46 from binascii import hexlify, unhexlify
47 from hashlib import sha256
48
49 def get_random_string(length=64, repeat=16):
50     nr_repeats = length//repeat
51
52     l = []
53     for i in range(repeat):
54         l.append(chr(ord('a') + rnd.randint(0,25)))
55     random_string = ''.join(l)
56
57     l = []
58     for i in range(nr_repeats):
59         l.append(random_string)
60     rem = length % repeat
61     l.append(random_string[0:rem])
62
63     return ''.join(l)
64
65 def recursive_remove(path):
66     for root, dirs, files in os.walk(path, topdown=False):
67         for name in files:
68             os.remove(os.path.join(root, name))
69         for name in dirs:
70             os.rmdir(os.path.join(root, name))
71
72 def merkle_hash(hashes):
73     if len(hashes) == 0:
74         return sha256('').digest()
75     if len(hashes) == 1:
76         return hashes[0]
77
78     s = 2
79     while s < len(hashes):
80         s = s * 2
81     hashes += [('\x00' * len(hashes[0]))] * (s - len(hashes))
82     while len(hashes) > 1 :
83         hashes = [sha256(hashes[i] + hashes[i + 1]).digest() for i in range (0, len(hashes), 2)]
84     return hashes[0]
85     
86
87 def init():
88     rnd.seed()
89     archipelago.common.BIN_DIR=os.path.join(os.getcwd(), '../../peers/user/')
90     archipelago.common.LOGS_PATH=os.path.join(os.getcwd(), 'logs')
91     archipelago.common.PIDFILE_PATH=os.path.join(os.getcwd(), 'pids')
92     if not os.path.isdir(archipelago.common.LOGS_PATH):
93         os.makedirs(archipelago.common.LOGS_PATH)
94     if not os.path.isdir(archipelago.common.PIDFILE_PATH):
95         os.makedirs(archipelago.common.PIDFILE_PATH)
96
97     recursive_remove(archipelago.common.LOGS_PATH)
98
99 class XsegTest(unittest.TestCase):
100     xseg = None
101     myport = 15
102     spec = "posix:testsegment:16:256:12".encode()
103     blocksize = 4*1024*1024
104
105     def setUp(self):
106         try:
107             create_segment(self.spec)
108         except Exception as e:
109             destroy_segment(self.spec)
110             create_segment(self.spec)
111         self.xseg = Xseg_ctx(self.spec, self.myport)
112
113     def tearDown(self):
114         if self.xseg:
115             self.xseg.shutdown()
116         destroy_segment(self.spec)
117
118     @staticmethod
119     def get_reply_info(size):
120         xinfo = xseg_reply_info()
121         xinfo.size = size
122         return xinfo
123
124     @staticmethod
125     def get_hash_reply(hashstring):
126         xhash = xseg_reply_hash()
127         xhash.target = hashstring
128         xhash.targetlen = len(hashstring)
129         return xhash
130
131     @staticmethod
132     def get_object_name(volume, epoch, index):
133         epoch_64 = ctypes.c_uint64(epoch)
134         index_64 = ctypes.c_uint64(index)
135         epoch_64_char = ctypes.cast(ctypes.addressof(epoch_64), ctypes.c_char_p)
136         index_64_char = ctypes.cast(ctypes.addressof(index_64), ctypes.c_char_p)
137         epoch_64_str = ctypes.string_at(epoch_64_char, ctypes.sizeof(ctypes.c_uint64))
138         index_64_str = ctypes.string_at(index_64_char, ctypes.sizeof(ctypes.c_uint64))
139         epoch_hex = hexlify(epoch_64_str)
140         index_hex = hexlify(index_64_str)
141         return "archip_" + volume + "_" + epoch_hex + "_" + index_hex
142
143     @staticmethod
144     def get_map_reply(offset, size):
145         blocksize = XsegTest.blocksize
146         ret = xseg_reply_map()
147         cnt = (offset+size)//blocksize - offset//blocksize
148         if (offset+size) % blocksize > 0 :
149             cnt += 1
150         ret.cnt = cnt
151         SegsArray = xseg_reply_map_scatterlist * cnt
152         segs = SegsArray()
153         rem_size = size
154         offset = offset % blocksize
155         for i in range(0, cnt):
156             segs[i].offset = offset
157             segs[i].size = blocksize - offset
158             if segs[i].size > rem_size:
159                 segs[i].size = rem_size
160             offset = 0
161             rem_size -= segs[i].size
162             if rem_size < 0 :
163                 raise Error("Calculation error")
164         ret.segs = segs
165
166         return ret
167
168     @staticmethod
169     def get_list_of_hashes(xreply, from_segment=False):
170         hashes = []
171         cnt = xreply.cnt
172         segs = xreply.segs
173         if from_segment:
174             SegsArray = xseg_reply_map_scatterlist * cnt
175             array = SegsArray.from_address(ctypes.addressof(segs))
176             segs = array
177         for i in range(0, cnt):
178             hashes.append(ctypes.string_at(segs[i].target, segs[i].targetlen))
179         return hashes
180
181     @staticmethod
182     def get_zero_map_reply(offset, size):
183         ret = XsegTest.get_map_reply(offset, size);
184         cnt = ret.cnt
185         for i in range(0, cnt):
186             ret.segs[i].target = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
187             ret.segs[i].targetlen = len(ret.segs[i].target)
188         return ret
189
190     @staticmethod
191     def get_copy_map_reply(volume, offset, size, epoch):
192         blocksize = XsegTest.blocksize
193         objidx_start = offset//blocksize
194         ret = XsegTest.get_map_reply(offset, size);
195         cnt = ret.cnt
196         for i in range(0, cnt):
197             ret.segs[i].target = MapperdTest.get_object_name(volume, epoch,
198                     objidx_start+i)
199             ret.segs[i].targetlen = len(ret.segs[i].target)
200         return ret
201
202     def get_req(self, op, dst, target, data=None, size=0, offset=0, datalen=0,
203             flags=0):
204         return Request(self.xseg, dst, target, data=data, size=size,
205                 offset=offset, datalen=datalen, flags=flags, op=op)
206
207     def assert_equal_xseg(self, req, expected_data):
208         if isinstance(expected_data, xseg_reply_info):
209             datasize = ctypes.sizeof(expected_data)
210             self.assertEqual(datasize, req.get_datalen())
211             data = req.get_data(type(expected_data)).contents
212             self.assertEqual(data.size, expected_data.size)
213         elif isinstance(expected_data, xseg_reply_map):
214             #since xseg_reply_map uses a flexible array for the
215             #xseg_reply_map_scatterlist reply, we calculate the size of the
216             #reply in the segment, by subtracting the size of the pointer to
217             #the array, in the python object
218             datasize = ctypes.sizeof(expected_data)
219             datasize -= ctypes.sizeof(expected_data.segs)
220             datasize += expected_data.cnt*ctypes.sizeof(xseg_reply_map_scatterlist)
221             self.assertEqual(datasize, req.get_datalen())
222             data = req.get_data(type(expected_data)).contents
223             cnt = data.cnt
224             self.assertEqual(data.cnt, expected_data.cnt)
225             segs = data.segs
226             SegsArray = xseg_reply_map_scatterlist * cnt
227             array = SegsArray.from_address(ctypes.addressof(segs))
228             expected_array = expected_data.segs
229             for i in range(0, cnt):
230                 t = ctypes.string_at(array[i].target, array[i].targetlen)
231                 self.assertEqual(array[i].targetlen, expected_array[i].targetlen)
232                 self.assertEqual(t, expected_array[i].target)
233                 self.assertEqual(array[i].offset, expected_array[i].offset)
234                 self.assertEqual(array[i].size, expected_array[i].size)
235         elif isinstance(expected_data, xseg_reply_hash):
236             datasize = ctypes.sizeof(expected_data)
237             self.assertEqual(datasize, req.get_datalen())
238             data = req.get_data(type(expected_data)).contents
239             self.assertEqual(data.targetlen, expected_data.targetlen)
240             t = ctypes.string_at(data.target, data.targetlen)
241             et = ctypes.string_at(expected_data.target, expected_data.targetlen)
242             self.assertEqual(t, et)
243         else:
244             raise Error("Unknown data type")
245
246     def evaluate_req(self, req, success=True, serviced=None, data=None):
247         if not success:
248             self.assertFalse(req.success())
249             return
250
251         self.assertTrue(req.success())
252         if serviced is not None:
253             self.assertEqual(req.get_serviced(), serviced)
254         if data is not None:
255             if isinstance(data, basestring):
256                 datalen = len(data)
257                 self.assertEqual(datalen, req.get_datalen())
258                 self.assertEqual(data, ctypes.string_at(req.get_data(None), datalen))
259             else:
260                 self.assert_equal_xseg(req, data)
261
262     def evaluate(send_func):
263         def send_and_evaluate(self, dst, target, expected=True, serviced=None,
264                 expected_data=None, **kwargs):
265             req = send_func(self, dst, target, **kwargs)
266             req.wait()
267             self.evaluate_req(req, success=expected, serviced=serviced,
268                     data=expected_data)
269             self.assertTrue(req.put())
270         return send_and_evaluate
271
272     def send_write(self, dst, target, data=None, offset=0, datalen=0):
273         #assert datalen >= size
274 #        req = self.get_req(X_WRITE, dst, target, data, size=size, offset=offset, datalen=datalen)
275         req = Request.get_write_request(self.xseg, dst, target, data=data,
276                 offset=offset, datalen=datalen)
277         req.submit()
278         return req
279
280     send_and_evaluate_write = evaluate(send_write)
281
282     def send_read(self, dst, target, size=0, datalen=0, offset=0):
283         #assert datalen >= size
284 #        req = self.get_req(X_READ, dst, target, data=None, size=size, offset=offset, datalen=datalen)
285         req = Request.get_read_request(self.xseg, dst, target, size=size,
286                 offset=offset, datalen=datalen)
287         req.submit()
288         return req
289
290     send_and_evaluate_read = evaluate(send_read)
291
292     def send_info(self, dst, target):
293         #req = self.get_req(X_INFO, dst, target, data=None, size=0)
294         req = Request.get_info_request(self.xseg, dst, target)
295         req.submit()
296         return req
297
298     send_and_evaluate_info = evaluate(send_info)
299
300     def send_copy(self, dst, src_target, dst_target=None, size=0, offset=0):
301         #datalen = ctypes.sizeof(xseg_request_copy)
302         #xcopy = xseg_request_copy()
303         #xcopy.target = src_target
304         #xcopy.targetlen = len(src_target)
305 #        req = self.get_req(X_COPY, dst, dst_target, data=xcopy, datalen=datalen,
306 #                offset=offset, size=size)
307         req = Request.get_copy_request(self.xseg, dst, src_target,
308                 copy_target=dst_target, size=size, offset=offset)
309         req.submit()
310         return req
311
312     send_and_evaluate_copy = evaluate(send_copy)
313
314     def send_acquire(self, dst, target):
315         #req = self.get_req(X_ACQUIRE, dst, target, flags=XF_NOSYNC)
316         req = Request.get_acquire_request(self.xseg, dst, target)
317         req.submit()
318         return req
319
320     send_and_evaluate_acquire = evaluate(send_acquire)
321
322     def send_release(self, dst, target, force=False):
323         #req_flags = XF_NOSYNC
324         #if force:
325             #req_flags |= XF_FORCE
326         #req = self.get_req(X_RELEASE, dst, target, size=0, flags=req_flags)
327         req = Request.get_release_request(self.xseg, dst, target, force)
328         req.submit()
329         return req
330
331     send_and_evaluate_release = evaluate(send_release)
332
333     def send_delete(self, dst, target):
334         #req = self.get_req(X_DELETE, dst, target)
335         req = Request.get_delete_request(self.xseg, dst, target)
336         req.submit()
337         return req
338
339     send_and_evaluate_delete = evaluate(send_delete)
340
341     def send_clone(self, dst, src_target, clone=None, clone_size=0,
342             cont_addr=False):
343         #xclone = xseg_request_clone()
344         #xclone.target = src_target
345         #xclone.targetlen = len(src_target)
346         #xclone.size = clone_size
347
348         #req = self.get_req(X_CLONE, dst, clone, data=xclone,
349                 #datalen=ctypes.sizeof(xclone))
350         req = Request.get_clone_request(self.xseg, dst, src_target,
351                 clone=clone, clone_size=clone_size, cont_addr=cont_addr)
352         req.submit()
353         return req
354
355     send_and_evaluate_clone = evaluate(send_clone)
356
357     def send_snapshot(self, dst, src_target, snap=None):
358         #xsnapshot = xseg_request_snapshot()
359         #xsnapshot.target = snap
360         #xsnapshot.targetlen = len(snap)
361
362         #req = self.get_req(X_SNAPSHOT, dst, src_target, data=xsnapshot,
363                 #datalen=ctypes.sizeof(xsnapshot))
364         req = Request.get_snapshot_request(self.xseg, dst, src_target, snap=snap)
365         req.submit()
366         return req
367
368     send_and_evaluate_snapshot = evaluate(send_snapshot)
369
370     def send_open(self, dst, target):
371         #req = self.get_req(X_OPEN, dst, target)
372         req = Request.get_open_request(self.xseg, dst, target)
373         req.submit()
374         return req
375
376     send_and_evaluate_open = evaluate(send_open)
377
378     def send_close(self, dst, target):
379         #req = self.get_req(X_CLOSE, dst, target)
380         req = Request.get_close_request(self.xseg, dst, target)
381         req.submit()
382         return req
383
384     send_and_evaluate_close = evaluate(send_close)
385
386     def send_map_read(self, dst, target, offset=0, size=0):
387         #req = self.get_req(X_MAPR, dst, target, size=size, offset=offset,
388                 #datalen=0)
389         req = Request.get_mapr_request(self.xseg, dst, target, offset=offset,
390                 size=size)
391         req.submit()
392         return req
393
394     send_and_evaluate_map_read = evaluate(send_map_read)
395
396     def send_map_write(self, dst, target, offset=0, size=0):
397         #req = self.get_req(X_MAPW, dst, target, size=size, offset=offset,
398                 #datalen=0)
399         req = Request.get_mapw_request(self.xseg, dst, target, offset=offset,
400                 size=size)
401         req.submit()
402         return req
403
404     send_and_evaluate_map_write = evaluate(send_map_write)
405
406     def send_hash(self, dst, target, size=0):
407         #req = self.get_req(X_hash, dst, target, data=None, size=0)
408         req = Request.get_hash_request(self.xseg, dst, target, size=size)
409         req.submit()
410         return req
411
412     send_and_evaluate_hash = evaluate(send_hash)
413
414     def get_filed(self, args, clean=False):
415         path = args['archip_dir']
416         if not os.path.exists(path):
417             os.makedirs(path)
418
419         if clean:
420             recursive_remove(path)
421
422         return Filed(**args)
423
424     def get_sosd(self, args, clean=False):
425         pool = args['pool']
426         import rados
427         cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
428         cluster.connect()
429         if cluster.pool_exists(pool):
430             cluster.delete_pool(pool)
431         cluster.create_pool(pool)
432
433         cluster.shutdown()
434         return Sosd(**args)
435
436     def get_mapperd(self, args):
437         return Mapperd(**args)
438
439     def get_vlmcd(self, args):
440         return Vlmcd(**args)
441
442 class VlmcdTest(XsegTest):
443     bfiled_args = {
444             'role': 'vlmctest-blockerb',
445             'spec': XsegTest.spec,
446             'nr_ops': 16,
447             'archip_dir': '/tmp/bfiledtest/',
448             'prefix': 'archip_',
449             'portno_start': 0,
450             'portno_end': 0,
451             'daemon': True,
452             'log_level': 3,
453             }
454     mfiled_args = {
455             'role': 'vlmctest-blockerm',
456             'spec': XsegTest.spec,
457             'nr_ops': 16,
458             'archip_dir': '/tmp/mfiledtest/',
459             'prefix': 'archip_',
460             'portno_start': 1,
461             'portno_end': 1,
462             'daemon': True,
463             'log_level': 3,
464             }
465     mapperd_args = {
466             'role': 'vlmctest-mapper',
467             'spec': XsegTest.spec,
468             'nr_ops': 16,
469             'portno_start': 2,
470             'portno_end': 2,
471             'daemon': True,
472             'log_level': 3,
473             'blockerb_port': 0,
474             'blockerm_port': 1,
475             }
476     vlmcd_args = {
477             'role': 'vlmctest-vlmc',
478             'spec': XsegTest.spec,
479             'nr_ops': 16,
480             'portno_start': 3,
481             'portno_end': 3,
482             'daemon': True,
483             'log_level': 3,
484             'blocker_port': 0,
485             'mapper_port': 2
486             }
487
488     def setUp(self):
489         super(VlmcdTest, self).setUp()
490         try:
491             self.blockerm = self.get_filed(self.mfiled_args, clean=True)
492             self.blockerb = self.get_filed(self.bfiled_args, clean=True)
493             self.mapperd = self.get_mapperd(self.mapperd_args)
494             self.vlmcd = self.get_vlmcd(self.vlmcd_args)
495             self.vlmcdport = self.vlmcd.portno_start
496             self.mapperdport = self.mapperd.portno_start
497             self.blockerbport = self.blockerb.portno_start
498             start_peer(self.blockerm)
499             start_peer(self.blockerb)
500             start_peer(self.mapperd)
501             start_peer(self.vlmcd)
502         except Exception as e:
503             print e
504             stop_peer(self.vlmcd)
505             stop_peer(self.mapperd)
506             stop_peer(self.blockerb)
507             stop_peer(self.blockerm)
508             super(VlmcdTest, self).tearDown()
509             raise e
510
511     def tearDown(self):
512         stop_peer(self.vlmcd)
513         stop_peer(self.mapperd)
514         stop_peer(self.blockerb)
515         stop_peer(self.blockerm)
516         super(VlmcdTest, self).tearDown()
517
518     def test_open(self):
519         volume = "myvolume"
520         volsize = 10*1024*1024
521         self.send_and_evaluate_open(self.vlmcdport, volume, expected=False)
522         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
523                 clone_size=volsize)
524         self.send_and_evaluate_open(self.vlmcdport, volume)
525         self.send_and_evaluate_open(self.vlmcdport, volume)
526
527     def test_close(self):
528         volume = "myvolume"
529         volsize = 10*1024*1024
530         self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
531         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
532                 clone_size=volsize)
533         self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
534         self.send_and_evaluate_open(self.vlmcdport, volume)
535         self.send_and_evaluate_close(self.vlmcdport, volume)
536         self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
537
538     def test_info(self):
539         volume = "myvolume"
540         volsize = 10*1024*1024
541         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
542                 clone_size=volsize)
543         xinfo = self.get_reply_info(volsize)
544         self.send_and_evaluate_info(self.vlmcdport, volume, expected_data=xinfo)
545
546     def test_write_read(self):
547         datalen = 1024
548         data = get_random_string(datalen, 16)
549         volume = "myvolume"
550         volsize = 10*1024*1024
551
552         self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
553                 expected=False)
554         self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
555                 expected=False)
556         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
557                 clone_size=volsize)
558         self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
559                 serviced=datalen)
560         self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
561                 expected_data=data)
562
563     def test_clone_snapshot(self):
564         volume = "myvolume"
565         snap = "mysnapshot"
566         snap2 = "mysnapshot2"
567         snap2 = "mysnapshot3"
568         clone1 = "myclone1"
569         clone2 = "myclone2"
570
571         volsize = 100*1024*1024*1024
572         clone2size = 200*1024*1024*1024
573         offset = 90*1024*1024*1024
574         size = 10*1024*1024
575
576         zeros = '\x00' * size
577         data = get_random_string(size, 16)
578         data2 = get_random_string(size, 16)
579
580         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
581                 clone_size=volsize)
582         self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
583                 offset=offset, expected_data=zeros)
584
585         self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
586         self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
587                 offset=offset, expected_data=zeros)
588         self.send_and_evaluate_write(self.vlmcdport, volume, data=data, offset=offset,
589                 serviced=size)
590         self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
591                 offset=offset, expected_data=zeros)
592         self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
593                 offset=offset, expected_data=data)
594
595         self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap2)
596         self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
597                 offset=offset, expected_data=data)
598         self.send_and_evaluate_clone(self.mapperdport, snap2, clone=clone1,
599                 clone_size=clone2size)
600         self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
601                 offset=offset, expected_data=data)
602         self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
603                 offset=volsize+offset, expected_data=zeros)
604
605         self.send_and_evaluate_write(self.vlmcdport, clone1, data=data2,
606                                 offset=offset, serviced=size)
607         self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
608                 offset=offset, expected_data=data2)
609         self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
610                 offset=offset, expected_data=data)
611         self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
612                 offset=offset, expected_data=data)
613         self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
614                 offset=offset, expected_data=zeros)
615
616     def test_info2(self):
617         volume = "myvolume"
618         volsize = 10*1024*1024
619         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
620                 clone_size=volsize)
621         xinfo = self.get_reply_info(volsize)
622         reqs = Set([])
623         reqs.add(self.send_info(self.vlmcdport, volume))
624         reqs.add(self.send_info(self.vlmcdport, volume))
625         while len(reqs) > 0:
626             req = self.xseg.wait_requests(reqs)
627             self.evaluate_req(req, data=xinfo)
628             reqs.remove(req)
629
630     def test_hash(self):
631         blocksize = self.blocksize
632         volume = "myvolume"
633         volume2 = "myvolume2"
634         snap = "snapshot"
635         clone = "clone"
636         volsize = 10*1024*1024
637         size = 512*1024
638         epoch = 1
639         offset = 0
640         data = get_random_string(size, 16)
641
642         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
643                 clone_size=volsize)
644         self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
645                                 offset=offset, serviced=size)
646
647         self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
648
649         self.send_and_evaluate_hash(self.mapperdport, volume, size=volsize,
650                 expected=False)
651         req = self.send_hash(self.mapperdport, snap, size=volsize)
652         req.wait()
653         self.assertTrue(req.success())
654         xreply = req.get_data(xseg_reply_hash).contents
655         hash_map = ctypes.string_at(xreply.target, xreply.targetlen)
656         req.put()
657
658         req = self.send_map_read(self.mapperdport, snap, offset=0,
659                 size=volsize)
660         req.wait()
661         self.assertTrue(req.success())
662         xreply = req.get_data(xseg_reply_map).contents
663         blocks = self.get_list_of_hashes(xreply, from_segment=True)
664         req.put()
665         h = []
666         for b in blocks:
667             if (b == sha256('').hexdigest()):
668                 h.append(unhexlify(b))
669                 continue
670             req = self.send_hash(self.blockerbport, b, size=blocksize)
671             req.wait()
672             self.assertTrue(req.success())
673             xreply = req.get_data(xseg_reply_hash).contents
674             h.append(unhexlify(ctypes.string_at(xreply.target, xreply.targetlen)))
675             req.put()
676
677         mh = hexlify(merkle_hash(h))
678         self.assertEqual(hash_map, mh)
679
680         self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
681                 clone_size=volsize * 2, expected=False)
682         self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
683                 clone_size=volsize * 2, cont_addr=True)
684         self.send_and_evaluate_read(self.vlmcdport, volume2, size=size,
685                 offset=offset, expected_data=data)
686         self.send_and_evaluate_read(self.vlmcdport, volume2, size=volsize - size,
687                 offset=offset + size, expected_data='\x00' * (volsize - size))
688
689
690 class MapperdTest(XsegTest):
691     bfiled_args = {
692             'role': 'mappertest-blockerb',
693             'spec': XsegTest.spec,
694             'nr_ops': 16,
695             'archip_dir': '/tmp/bfiledtest/',
696             'prefix': 'archip_',
697             'portno_start': 0,
698             'portno_end': 0,
699             'daemon': True,
700             'log_level': 3,
701             }
702     mfiled_args = {
703             'role': 'mappertest-blockerm',
704             'spec': XsegTest.spec,
705             'nr_ops': 16,
706             'archip_dir': '/tmp/mfiledtest/',
707             'prefix': 'archip_',
708             'portno_start': 1,
709             'portno_end': 1,
710             'daemon': True,
711             'log_level': 3,
712             }
713     mapperd_args = {
714             'role': 'mappertest-mapper',
715             'spec': XsegTest.spec,
716             'nr_ops': 16,
717             'portno_start': 2,
718             'portno_end': 2,
719             'daemon': True,
720             'log_level': 3,
721             'blockerb_port': 0,
722             'blockerm_port': 1,
723             }
724     blocksize = 4*1024*1024
725
726     def setUp(self):
727         super(MapperdTest, self).setUp()
728         try:
729             self.blockerm = self.get_filed(self.mfiled_args, clean=True)
730             self.blockerb = self.get_filed(self.bfiled_args, clean=True)
731             self.mapperd = self.get_mapperd(self.mapperd_args)
732             self.mapperdport = self.mapperd.portno_start
733             start_peer(self.blockerm)
734             start_peer(self.blockerb)
735             start_peer(self.mapperd)
736         except Exception as e:
737             print e
738             stop_peer(self.mapperd)
739             stop_peer(self.blockerb)
740             stop_peer(self.blockerm)
741             super(MapperdTest, self).tearDown()
742             raise e
743
744     def tearDown(self):
745         stop_peer(self.mapperd)
746         stop_peer(self.blockerb)
747         stop_peer(self.blockerm)
748         super(MapperdTest, self).tearDown()
749
750     def test_create(self):
751         volume = "myvolume"
752         volsize = 10*1024*1024
753         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
754                 clone_size=0, expected=False)
755         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
756                 clone_size=volsize)
757         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
758                 clone_size=volsize, expected=False)
759
760     def test_delete(self):
761         volume = "myvolume"
762         volsize = 10*1024*1024
763         offset = 0
764         size = 10
765         epoch = 2
766
767         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
768                 clone_size=0, expected=False)
769         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
770                 clone_size=volsize)
771         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
772                 clone_size=volsize, expected=False)
773         self.send_and_evaluate_delete(self.mapperdport, volume)
774         self.send_and_evaluate_delete(self.mapperdport, volume, expected=False)
775         self.send_and_evaluate_map_write(self.mapperdport, volume,
776                 offset=offset, size=size, expected=False)
777         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
778                 clone_size=volsize)
779
780         ret = self.get_copy_map_reply(volume, offset, size, epoch)
781
782         self.send_and_evaluate_map_write(self.mapperdport, volume,
783                 expected_data=ret, offset=offset, size=size)
784
785     def test_clone_snapshot(self):
786         volume = "myvolume"
787         snap = "mysnapshot"
788         snap2 = "mysnapshot2"
789         snap2 = "mysnapshot3"
790         clone1 = "myclone1"
791         clone2 = "myclone2"
792         volsize = 100*1024*1024*1024
793         clone2size = 200*1024*1024*1024
794         offset = 90*1024*1024*1024
795         size = 10*1024*1024
796         offset = 0
797         size = volsize
798         epoch = 2
799
800         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
801                 clone_size=volsize)
802         self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap)
803         self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap,
804                 expected=False)
805         xinfo = self.get_reply_info(volsize)
806         self.send_and_evaluate_info(self.mapperdport, snap, expected_data=xinfo)
807         ret = self.get_zero_map_reply(offset, size)
808         self.send_and_evaluate_map_read(self.mapperdport, volume,
809                 expected_data=ret, offset=offset, size=size)
810         self.send_and_evaluate_map_read(self.mapperdport, snap,
811                 expected_data=ret, offset=offset, size=size)
812
813         ret = self.get_copy_map_reply(volume, offset, size, epoch)
814         self.send_and_evaluate_map_write(self.mapperdport, volume,
815                 expected_data=ret, offset=offset, size=size)
816
817         stop_peer(self.mapperd)
818         start_peer(self.mapperd)
819         self.send_and_evaluate_map_read(self.mapperdport, volume,
820                 expected_data=ret, offset=offset, size=size)
821
822         self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone1)
823         xinfo = self.get_reply_info(volsize)
824         self.send_and_evaluate_info(self.mapperdport, clone1, expected_data=xinfo)
825         self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
826                 clone_size=2, expected=False)
827         self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
828                 clone_size=clone2size)
829         xinfo = self.get_reply_info(clone2size)
830         self.send_and_evaluate_info(self.mapperdport, clone2, expected_data=xinfo)
831
832     def test_info(self):
833         volume = "myvolume"
834         volsize = 10*1024*1024
835         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
836                 clone_size=volsize)
837         xinfo = self.get_reply_info(volsize)
838         self.send_and_evaluate_info(self.mapperdport, volume, expected_data=xinfo)
839
840     def test_info2(self):
841         volume = "myvolume"
842         volsize = 10*1024*1024
843         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
844                 clone_size=volsize)
845         xinfo = self.get_reply_info(volsize)
846         reqs = Set([])
847         reqs.add(self.send_info(self.mapperdport, volume))
848         reqs.add(self.send_info(self.mapperdport, volume))
849         while len(reqs) > 0:
850             req = self.xseg.wait_requests(reqs)
851             self.evaluate_req(req, data=xinfo)
852             reqs.remove(req)
853
854     def test_open(self):
855         volume = "myvolume"
856         volsize = 10*1024*1024
857         self.send_and_evaluate_open(self.mapperdport, volume, expected=False)
858         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
859                 clone_size=volsize)
860         self.send_and_evaluate_open(self.mapperdport, volume)
861         self.send_and_evaluate_open(self.mapperdport, volume)
862
863     def test_open2(self):
864         volume = "myvolume"
865         volsize = 10*1024*1024
866         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
867                 clone_size=volsize)
868         reqs = Set([])
869         reqs.add(self.send_open(self.mapperdport, volume))
870         reqs.add(self.send_open(self.mapperdport, volume))
871         while len(reqs) > 0:
872             req = self.xseg.wait_requests(reqs)
873             self.evaluate_req(req)
874             reqs.remove(req)
875
876     def test_close(self):
877         volume = "myvolume"
878         volsize = 10*1024*1024
879         self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
880         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
881                 clone_size=volsize)
882         self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
883         self.send_and_evaluate_open(self.mapperdport, volume)
884         self.send_and_evaluate_close(self.mapperdport, volume)
885         self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
886
887     def test_mapr(self):
888         volume = "myvolume"
889         volsize = 10*1024*1024
890         offset = 0
891         size = volsize
892         ret = MapperdTest.get_zero_map_reply(offset, size)
893         self.send_and_evaluate_map_read(self.mapperdport, volume,
894                 offset=offset, size=size, expected=False)
895         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
896                 clone_size=volsize)
897         self.send_and_evaluate_map_read(self.mapperdport, volume,
898                 expected_data=ret, offset=offset, size=size)
899         offset = volsize - 1
900         self.send_and_evaluate_map_read(self.mapperdport, volume,
901                 offset=offset, size=size, expected=False)
902         offset = volsize + 1
903         self.send_and_evaluate_map_read(self.mapperdport, volume,
904                 offset=offset, size=size, expected=False)
905
906     def test_mapr2(self):
907         volume = "myvolume"
908         volsize = 10*1024*1024
909         offset = 0
910         size = volsize
911
912         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
913                 clone_size=volsize)
914         reqs = Set([])
915         reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
916             size=size))
917         reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
918             size=size))
919         ret = MapperdTest.get_zero_map_reply(offset, size)
920         while len(reqs) > 0:
921             req = self.xseg.wait_requests(reqs)
922             self.evaluate_req(req, data=ret)
923             reqs.remove(req)
924
925     def test_mapw(self):
926         blocksize = self.blocksize
927         volume = "myvolume"
928         volsize = 100*1024*1024*1024
929         offset = 90*1024*1024*1024 - 2
930         size = 512*1024
931         epoch = 1
932
933         ret = self.get_copy_map_reply(volume, offset, size, epoch)
934
935         self.send_and_evaluate_map_write(self.mapperdport, volume,
936                 offset=offset, size=size, expected=False)
937         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
938                 clone_size=volsize)
939         self.send_and_evaluate_map_write(self.mapperdport, volume,
940                 expected_data=ret, offset=offset, size=size)
941         self.send_and_evaluate_map_read(self.mapperdport, volume,
942                 expected_data = ret, offset=offset, size=size)
943         stop_peer(self.mapperd)
944         start_peer(self.mapperd)
945         self.send_and_evaluate_map_read(self.mapperdport, volume,
946                 expected_data=ret, offset=offset, size=size)
947         self.send_and_evaluate_open(self.mapperdport, volume)
948         offset = 101*1024*1024*1024
949         self.send_and_evaluate_map_write(self.mapperdport, volume,
950                 offset=offset, size=size, expected=False)
951         offset = 100*1024*1024*1024 - 1
952         self.send_and_evaluate_map_write(self.mapperdport, volume,
953                 offset=offset, size=size, expected=False)
954
955     def test_mapw2(self):
956         blocksize = self.blocksize
957         volume = "myvolume"
958         volsize = 100*1024*1024*1024
959         offset = 90*1024*1024*1024 - 2
960         size = 512*1024
961         epoch = 1
962
963         ret = self.get_copy_map_reply(volume, offset, size, epoch)
964
965         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
966                 clone_size=volsize)
967
968         reqs = Set([])
969         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
970             size=size))
971         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
972             size=size))
973         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
974             size=size))
975         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
976             size=size))
977         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
978             size=size))
979         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
980             size=size))
981         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
982             size=size))
983         while len(reqs) > 0:
984             req = self.xseg.wait_requests(reqs)
985             self.evaluate_req(req, data=ret)
986             reqs.remove(req)
987
988 class BlockerTest(object):
989     def test_write_read(self):
990         datalen = 1024
991         data = get_random_string(datalen, 16)
992         target = "mytarget"
993         xinfo = self.get_reply_info(datalen)
994
995         self.send_and_evaluate_write(self.blockerport, target, data=data,
996                 serviced=datalen)
997         self.send_and_evaluate_read(self.blockerport, target, size=datalen,
998                 expected_data=data)
999         self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1000         stop_peer(self.blocker)
1001         start_peer(self.blocker)
1002         self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1003                 expected_data=data)
1004         self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1005
1006     def test_info(self):
1007         datalen = 1024
1008         data = get_random_string(datalen, 16)
1009         target = "mytarget"
1010         self.send_and_evaluate_write(self.blockerport, target, data=data,
1011                 serviced=datalen)
1012         xinfo = self.get_reply_info(datalen)
1013         self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1014
1015     def test_copy(self):
1016         datalen = 1024
1017         data = get_random_string(datalen, 16)
1018         target = "mytarget"
1019         copy_target = "copy_target"
1020
1021         self.send_and_evaluate_write(self.blockerport, target, data=data,
1022                 serviced=datalen)
1023         self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1024                 expected_data=data, serviced=datalen)
1025         self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
1026                 size=datalen, serviced=datalen)
1027         self.send_and_evaluate_read(self.blockerport, copy_target, size=datalen,
1028                 expected_data=data)
1029
1030
1031     def test_delete(self):
1032         datalen = 1024
1033         data = get_random_string(datalen, 16)
1034         target = "mytarget"
1035         self.send_and_evaluate_delete(self.blockerport, target, False)
1036         self.send_and_evaluate_write(self.blockerport, target, data=data)
1037         self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1038                 expected_data=data)
1039         self.send_and_evaluate_delete(self.blockerport, target, True)
1040         data = '\x00' * datalen
1041         self.send_and_evaluate_read(self.blockerport, target, size=datalen, expected_data=data)
1042
1043     def test_hash(self):
1044         datalen = 1024
1045         data = '\x00'*datalen
1046         target = "target_zeros"
1047
1048
1049         self.send_and_evaluate_write(self.blockerport, target, data=data,
1050                 serviced=datalen)
1051         ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1052         self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1053                 expected_data=ret, serviced=datalen)
1054
1055         target = "mytarget"
1056         data = get_random_string(datalen, 16)
1057         self.send_and_evaluate_write(self.blockerport, target, data=data,
1058                 serviced=datalen)
1059         ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1060         self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1061                 expected_data=ret, serviced=datalen)
1062         self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1063                 expected_data=ret, serviced=datalen)
1064         self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1065                 expected_data=ret, serviced=datalen)
1066
1067     def test_locking(self):
1068         target = "mytarget"
1069         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1070         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1071         self.send_and_evaluate_release(self.blockerport, target, expected=True)
1072         self.send_and_evaluate_release(self.blockerport, target, expected=False)
1073         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1074         stop_peer(self.blocker)
1075         start_peer(self.blocker)
1076         self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1077         self.send_and_evaluate_release(self.blockerport, target, expected=False)
1078         self.send_and_evaluate_release(self.blockerport, target, force=True,
1079                 expected=True)
1080
1081
1082 class FiledTest(BlockerTest, XsegTest):
1083     filed_args = {
1084             'role': 'testfiled',
1085             'spec': XsegTest.spec,
1086             'nr_ops': 16,
1087             'archip_dir': '/tmp/filedtest/',
1088             'prefix': 'archip_',
1089             'portno_start': 0,
1090             'portno_end': 0,
1091             'daemon': True,
1092             'log_level': 3,
1093             }
1094
1095     def setUp(self):
1096         super(FiledTest, self).setUp()
1097         try:
1098             self.blocker = self.get_filed(self.filed_args, clean=True)
1099             self.blockerport = self.blocker.portno_start
1100             start_peer(self.blocker)
1101         except Exception as e:
1102             super(FiledTest, self).tearDown()
1103             raise e
1104
1105     def tearDown(self):
1106         stop_peer(self.blocker)
1107         super(FiledTest, self).tearDown()
1108
1109     def test_locking(self):
1110         target = "mytarget"
1111         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1112         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1113         self.send_and_evaluate_release(self.blockerport, target, expected=True)
1114         self.send_and_evaluate_release(self.blockerport, target, expected=False)
1115         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1116         stop_peer(self.blocker)
1117         new_filed_args = copy(self.filed_args)
1118         new_filed_args['unique_str'] = 'ThisisSparta'
1119         self.blocker = Filed(**new_filed_args)
1120         start_peer(self.blocker)
1121         self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1122         self.send_and_evaluate_release(self.blockerport, target, expected=False)
1123         self.send_and_evaluate_release(self.blockerport, target, force=True,
1124                 expected=True)
1125
1126 class SosdTest(BlockerTest, XsegTest):
1127     filed_args = {
1128             'role': 'testsosd',
1129             'spec': XsegTest.spec,
1130             'nr_ops': 16,
1131             'portno_start': 0,
1132             'portno_end': 0,
1133             'daemon': True,
1134             'log_level': 3,
1135             'pool': 'test_sosd',
1136             'nr_threads': 3,
1137             }
1138
1139     def setUp(self):
1140         super(SosdTest, self).setUp()
1141         try:
1142             self.blocker = self.get_sosd(self.filed_args, clean=True)
1143             self.blockerport = self.blocker.portno_start
1144             start_peer(self.blocker)
1145         except Exception as e:
1146             super(SosdTest, self).tearDown()
1147             raise e
1148
1149     def tearDown(self):
1150         stop_peer(self.blocker)
1151         super(SosdTest, self).tearDown()
1152
1153 if __name__=='__main__':
1154     init()
1155     unittest.main()