Strip xseg stuff
[archipelago] / tools / qa / tests.py
1 # Copyright 2013 GRNET S.A. All rights reserved.
2 #
3 # Redistribution and use in source and binary forms, with or
4 # without modification, are permitted provided that the following
5 # conditions are met:
6 #
7 #   1. Redistributions of source code must retain the above
8 #      copyright notice, this list of conditions and the following
9 #      disclaimer.
10 #
11 #   2. Redistributions in binary form must reproduce the above
12 #      copyright notice, this list of conditions and the following
13 #      disclaimer in the documentation and/or other materials
14 #      provided with the distribution.
15 #
16 # THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17 # OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18 # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23 # USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24 # AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25 # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26 # ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 #
29 # The views and conclusions contained in the software and
30 # documentation are those of the authors and should not be
31 # interpreted as representing official policies, either expressed
32 # or implied, of GRNET S.A.
33
34 import archipelago
35 from archipelago.common import Xseg_ctx, Request, Filed, Mapperd, Vlmcd, Sosd, \
36         Error, Segment
37 from archipelago.archipelago import start_peer, stop_peer
38 import random as rnd
39 import unittest2 as unittest
40 from xseg.xprotocol import *
41 from xseg.xseg_api import *
42 import ctypes
43 import os
44 from copy import copy
45 from sets import Set
46 from binascii import hexlify, unhexlify
47 from hashlib import sha256
48
49 def get_random_string(length=64, repeat=16):
50     nr_repeats = length//repeat
51
52     l = []
53     for i in range(repeat):
54         l.append(chr(ord('a') + rnd.randint(0,25)))
55     random_string = ''.join(l)
56
57     l = []
58     for i in range(nr_repeats):
59         l.append(random_string)
60     rem = length % repeat
61     l.append(random_string[0:rem])
62
63     return ''.join(l)
64
65 def recursive_remove(path):
66     for root, dirs, files in os.walk(path, topdown=False):
67         for name in files:
68             os.remove(os.path.join(root, name))
69         for name in dirs:
70             os.rmdir(os.path.join(root, name))
71
72 def merkle_hash(hashes):
73     if len(hashes) == 0:
74         return sha256('').digest()
75     if len(hashes) == 1:
76         return hashes[0]
77
78     s = 2
79     while s < len(hashes):
80         s = s * 2
81     hashes += [('\x00' * len(hashes[0]))] * (s - len(hashes))
82     while len(hashes) > 1 :
83         hashes = [sha256(hashes[i] + hashes[i + 1]).digest() for i in range (0, len(hashes), 2)]
84     return hashes[0]
85     
86
87 def init():
88     rnd.seed()
89 #    archipelago.common.BIN_DIR=os.path.join(os.getcwd(), '../../peers/user/')
90     archipelago.common.LOGS_PATH=os.path.join(os.getcwd(), 'logs')
91     archipelago.common.PIDFILE_PATH=os.path.join(os.getcwd(), 'pids')
92     if not os.path.isdir(archipelago.common.LOGS_PATH):
93         os.makedirs(archipelago.common.LOGS_PATH)
94     if not os.path.isdir(archipelago.common.PIDFILE_PATH):
95         os.makedirs(archipelago.common.PIDFILE_PATH)
96
97     recursive_remove(archipelago.common.LOGS_PATH)
98
99 class XsegTest(unittest.TestCase):
100     spec = "posix:testsegment:8:16:256:12".encode()
101     blocksize = 4*1024*1024
102     segment = None
103
104     def setUp(self):
105         self.segment = Segment('posix', 'testsegment', 8, 16, 256, 12)
106         try:
107             self.segment.create()
108         except Exception as e:
109             self.segment.destroy()
110             self.segment.create()
111         self.xseg = Xseg_ctx(self.segment)
112
113     def tearDown(self):
114         if self.xseg:
115             self.xseg.shutdown()
116         if self.segment:
117             self.segment.destroy()
118
119     @staticmethod
120     def get_reply_info(size):
121         xinfo = xseg_reply_info()
122         xinfo.size = size
123         return xinfo
124
125     @staticmethod
126     def get_hash_reply(hashstring):
127         xhash = xseg_reply_hash()
128         xhash.target = hashstring
129         xhash.targetlen = len(hashstring)
130         return xhash
131
132     @staticmethod
133     def get_object_name(volume, epoch, index):
134         epoch_64 = ctypes.c_uint64(epoch)
135         index_64 = ctypes.c_uint64(index)
136         epoch_64_char = ctypes.cast(ctypes.addressof(epoch_64), ctypes.c_char_p)
137         index_64_char = ctypes.cast(ctypes.addressof(index_64), ctypes.c_char_p)
138         epoch_64_str = ctypes.string_at(epoch_64_char, ctypes.sizeof(ctypes.c_uint64))
139         index_64_str = ctypes.string_at(index_64_char, ctypes.sizeof(ctypes.c_uint64))
140         epoch_hex = hexlify(epoch_64_str)
141         index_hex = hexlify(index_64_str)
142         return "archip_" + volume + "_" + epoch_hex + "_" + index_hex
143
144     @staticmethod
145     def get_map_reply(offset, size):
146         blocksize = XsegTest.blocksize
147         ret = xseg_reply_map()
148         cnt = (offset+size)//blocksize - offset//blocksize
149         if (offset+size) % blocksize > 0 :
150             cnt += 1
151         ret.cnt = cnt
152         SegsArray = xseg_reply_map_scatterlist * cnt
153         segs = SegsArray()
154         rem_size = size
155         offset = offset % blocksize
156         for i in range(0, cnt):
157             segs[i].offset = offset
158             segs[i].size = blocksize - offset
159             if segs[i].size > rem_size:
160                 segs[i].size = rem_size
161             offset = 0
162             rem_size -= segs[i].size
163             if rem_size < 0 :
164                 raise Error("Calculation error")
165         ret.segs = segs
166
167         return ret
168
169     @staticmethod
170     def get_list_of_hashes(xreply, from_segment=False):
171         hashes = []
172         cnt = xreply.cnt
173         segs = xreply.segs
174         if from_segment:
175             SegsArray = xseg_reply_map_scatterlist * cnt
176             array = SegsArray.from_address(ctypes.addressof(segs))
177             segs = array
178         for i in range(0, cnt):
179             hashes.append(ctypes.string_at(segs[i].target, segs[i].targetlen))
180         return hashes
181
182     @staticmethod
183     def get_zero_map_reply(offset, size):
184         ret = XsegTest.get_map_reply(offset, size);
185         cnt = ret.cnt
186         for i in range(0, cnt):
187             ret.segs[i].target = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
188             ret.segs[i].targetlen = len(ret.segs[i].target)
189         return ret
190
191     @staticmethod
192     def get_copy_map_reply(volume, offset, size, epoch):
193         blocksize = XsegTest.blocksize
194         objidx_start = offset//blocksize
195         ret = XsegTest.get_map_reply(offset, size);
196         cnt = ret.cnt
197         for i in range(0, cnt):
198             ret.segs[i].target = MapperdTest.get_object_name(volume, epoch,
199                     objidx_start+i)
200             ret.segs[i].targetlen = len(ret.segs[i].target)
201         return ret
202
203     def get_req(self, op, dst, target, data=None, size=0, offset=0, datalen=0,
204             flags=0):
205         return Request(self.xseg, dst, target, data=data, size=size,
206                 offset=offset, datalen=datalen, flags=flags, op=op)
207
208     def assert_equal_xseg(self, req, expected_data):
209         if isinstance(expected_data, xseg_reply_info):
210             datasize = ctypes.sizeof(expected_data)
211             self.assertEqual(datasize, req.get_datalen())
212             data = req.get_data(type(expected_data)).contents
213             self.assertEqual(data.size, expected_data.size)
214         elif isinstance(expected_data, xseg_reply_map):
215             #since xseg_reply_map uses a flexible array for the
216             #xseg_reply_map_scatterlist reply, we calculate the size of the
217             #reply in the segment, by subtracting the size of the pointer to
218             #the array, in the python object
219             datasize = ctypes.sizeof(expected_data)
220             datasize -= ctypes.sizeof(expected_data.segs)
221             datasize += expected_data.cnt*ctypes.sizeof(xseg_reply_map_scatterlist)
222             self.assertEqual(datasize, req.get_datalen())
223             data = req.get_data(type(expected_data)).contents
224             cnt = data.cnt
225             self.assertEqual(data.cnt, expected_data.cnt)
226             segs = data.segs
227             SegsArray = xseg_reply_map_scatterlist * cnt
228             array = SegsArray.from_address(ctypes.addressof(segs))
229             expected_array = expected_data.segs
230             for i in range(0, cnt):
231                 t = ctypes.string_at(array[i].target, array[i].targetlen)
232                 self.assertEqual(array[i].targetlen, expected_array[i].targetlen)
233                 self.assertEqual(t, expected_array[i].target)
234                 self.assertEqual(array[i].offset, expected_array[i].offset)
235                 self.assertEqual(array[i].size, expected_array[i].size)
236         elif isinstance(expected_data, xseg_reply_hash):
237             datasize = ctypes.sizeof(expected_data)
238             self.assertEqual(datasize, req.get_datalen())
239             data = req.get_data(type(expected_data)).contents
240             self.assertEqual(data.targetlen, expected_data.targetlen)
241             t = ctypes.string_at(data.target, data.targetlen)
242             et = ctypes.string_at(expected_data.target, expected_data.targetlen)
243             self.assertEqual(t, et)
244         else:
245             raise Error("Unknown data type")
246
247     def evaluate_req(self, req, success=True, serviced=None, data=None):
248         if not success:
249             self.assertFalse(req.success())
250             return
251
252         self.assertTrue(req.success())
253         if serviced is not None:
254             self.assertEqual(req.get_serviced(), serviced)
255         if data is not None:
256             if isinstance(data, basestring):
257                 datalen = len(data)
258                 self.assertEqual(datalen, req.get_datalen())
259                 self.assertEqual(data, ctypes.string_at(req.get_data(None), datalen))
260             else:
261                 self.assert_equal_xseg(req, data)
262
263     def evaluate(send_func):
264         def send_and_evaluate(self, dst, target, expected=True, serviced=None,
265                 expected_data=None, **kwargs):
266             req = send_func(self, dst, target, **kwargs)
267             req.wait()
268             self.evaluate_req(req, success=expected, serviced=serviced,
269                     data=expected_data)
270             self.assertTrue(req.put())
271         return send_and_evaluate
272
273     def send_write(self, dst, target, data=None, offset=0, datalen=0, flags=0):
274         #assert datalen >= size
275 #        req = self.get_req(X_WRITE, dst, target, data, size=size, offset=offset, datalen=datalen)
276         req = Request.get_write_request(self.xseg, dst, target, data=data,
277                 offset=offset, datalen=datalen, flags=flags)
278         req.submit()
279         return req
280
281     send_and_evaluate_write = evaluate(send_write)
282
283     def send_read(self, dst, target, size=0, datalen=0, offset=0):
284         #assert datalen >= size
285 #        req = self.get_req(X_READ, dst, target, data=None, size=size, offset=offset, datalen=datalen)
286         req = Request.get_read_request(self.xseg, dst, target, size=size,
287                 offset=offset, datalen=datalen)
288         req.submit()
289         return req
290
291     send_and_evaluate_read = evaluate(send_read)
292
293     def send_info(self, dst, target):
294         #req = self.get_req(X_INFO, dst, target, data=None, size=0)
295         req = Request.get_info_request(self.xseg, dst, target)
296         req.submit()
297         return req
298
299     send_and_evaluate_info = evaluate(send_info)
300
301     def send_copy(self, dst, src_target, dst_target=None, size=0, offset=0):
302         #datalen = ctypes.sizeof(xseg_request_copy)
303         #xcopy = xseg_request_copy()
304         #xcopy.target = src_target
305         #xcopy.targetlen = len(src_target)
306 #        req = self.get_req(X_COPY, dst, dst_target, data=xcopy, datalen=datalen,
307 #                offset=offset, size=size)
308         req = Request.get_copy_request(self.xseg, dst, src_target,
309                 copy_target=dst_target, size=size, offset=offset)
310         req.submit()
311         return req
312
313     send_and_evaluate_copy = evaluate(send_copy)
314
315     def send_acquire(self, dst, target):
316         #req = self.get_req(X_ACQUIRE, dst, target, flags=XF_NOSYNC)
317         req = Request.get_acquire_request(self.xseg, dst, target)
318         req.submit()
319         return req
320
321     send_and_evaluate_acquire = evaluate(send_acquire)
322
323     def send_release(self, dst, target, force=False):
324         #req_flags = XF_NOSYNC
325         #if force:
326             #req_flags |= XF_FORCE
327         #req = self.get_req(X_RELEASE, dst, target, size=0, flags=req_flags)
328         req = Request.get_release_request(self.xseg, dst, target, force)
329         req.submit()
330         return req
331
332     send_and_evaluate_release = evaluate(send_release)
333
334     def send_delete(self, dst, target):
335         #req = self.get_req(X_DELETE, dst, target)
336         req = Request.get_delete_request(self.xseg, dst, target)
337         req.submit()
338         return req
339
340     send_and_evaluate_delete = evaluate(send_delete)
341
342     def send_clone(self, dst, src_target, clone=None, clone_size=0,
343             cont_addr=False):
344         #xclone = xseg_request_clone()
345         #xclone.target = src_target
346         #xclone.targetlen = len(src_target)
347         #xclone.size = clone_size
348
349         #req = self.get_req(X_CLONE, dst, clone, data=xclone,
350                 #datalen=ctypes.sizeof(xclone))
351         req = Request.get_clone_request(self.xseg, dst, src_target,
352                 clone=clone, clone_size=clone_size, cont_addr=cont_addr)
353         req.submit()
354         return req
355
356     send_and_evaluate_clone = evaluate(send_clone)
357
358     def send_snapshot(self, dst, src_target, snap=None):
359         #xsnapshot = xseg_request_snapshot()
360         #xsnapshot.target = snap
361         #xsnapshot.targetlen = len(snap)
362
363         #req = self.get_req(X_SNAPSHOT, dst, src_target, data=xsnapshot,
364                 #datalen=ctypes.sizeof(xsnapshot))
365         req = Request.get_snapshot_request(self.xseg, dst, src_target, snap=snap)
366         req.submit()
367         return req
368
369     send_and_evaluate_snapshot = evaluate(send_snapshot)
370
371     def send_open(self, dst, target):
372         #req = self.get_req(X_OPEN, dst, target)
373         req = Request.get_open_request(self.xseg, dst, target)
374         req.submit()
375         return req
376
377     send_and_evaluate_open = evaluate(send_open)
378
379     def send_close(self, dst, target):
380         #req = self.get_req(X_CLOSE, dst, target)
381         req = Request.get_close_request(self.xseg, dst, target)
382         req.submit()
383         return req
384
385     send_and_evaluate_close = evaluate(send_close)
386
387     def send_map_read(self, dst, target, offset=0, size=0):
388         #req = self.get_req(X_MAPR, dst, target, size=size, offset=offset,
389                 #datalen=0)
390         req = Request.get_mapr_request(self.xseg, dst, target, offset=offset,
391                 size=size)
392         req.submit()
393         return req
394
395     send_and_evaluate_map_read = evaluate(send_map_read)
396
397     def send_map_write(self, dst, target, offset=0, size=0):
398         #req = self.get_req(X_MAPW, dst, target, size=size, offset=offset,
399                 #datalen=0)
400         req = Request.get_mapw_request(self.xseg, dst, target, offset=offset,
401                 size=size)
402         req.submit()
403         return req
404
405     send_and_evaluate_map_write = evaluate(send_map_write)
406
407     def send_hash(self, dst, target, size=0):
408         #req = self.get_req(X_hash, dst, target, data=None, size=0)
409         req = Request.get_hash_request(self.xseg, dst, target, size=size)
410         req.submit()
411         return req
412
413     send_and_evaluate_hash = evaluate(send_hash)
414
415     def get_filed(self, args, clean=False):
416         path = args['archip_dir']
417         if not os.path.exists(path):
418             os.makedirs(path)
419
420         if clean:
421             recursive_remove(path)
422
423         return Filed(**args)
424
425     def get_sosd(self, args, clean=False):
426         pool = args['pool']
427         import rados
428         cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
429         cluster.connect()
430         if cluster.pool_exists(pool):
431             cluster.delete_pool(pool)
432         cluster.create_pool(pool)
433
434         cluster.shutdown()
435         return Sosd(**args)
436
437     def get_mapperd(self, args):
438         return Mapperd(**args)
439
440     def get_vlmcd(self, args):
441         return Vlmcd(**args)
442
443 class VlmcdTest(XsegTest):
444     bfiled_args = {
445             'role': 'vlmctest-blockerb',
446             'spec': XsegTest.spec,
447             'nr_ops': 16,
448             'archip_dir': '/tmp/bfiledtest/',
449             'prefix': 'archip_',
450             'portno_start': 0,
451             'portno_end': 0,
452             'daemon': True,
453             'log_level': 3,
454             }
455     mfiled_args = {
456             'role': 'vlmctest-blockerm',
457             'spec': XsegTest.spec,
458             'nr_ops': 16,
459             'archip_dir': '/tmp/mfiledtest/',
460             'prefix': 'archip_',
461             'portno_start': 1,
462             'portno_end': 1,
463             'daemon': True,
464             'log_level': 3,
465             }
466     mapperd_args = {
467             'role': 'vlmctest-mapper',
468             'spec': XsegTest.spec,
469             'nr_ops': 16,
470             'portno_start': 2,
471             'portno_end': 2,
472             'daemon': True,
473             'log_level': 3,
474             'blockerb_port': 0,
475             'blockerm_port': 1,
476             }
477     vlmcd_args = {
478             'role': 'vlmctest-vlmc',
479             'spec': XsegTest.spec,
480             'nr_ops': 16,
481             'portno_start': 3,
482             'portno_end': 3,
483             'daemon': True,
484             'log_level': 3,
485             'blocker_port': 0,
486             'mapper_port': 2
487             }
488
489     def setUp(self):
490         super(VlmcdTest, self).setUp()
491         try:
492             self.blockerm = self.get_filed(self.mfiled_args, clean=True)
493             self.blockerb = self.get_filed(self.bfiled_args, clean=True)
494             self.mapperd = self.get_mapperd(self.mapperd_args)
495             self.vlmcd = self.get_vlmcd(self.vlmcd_args)
496             self.vlmcdport = self.vlmcd.portno_start
497             self.mapperdport = self.mapperd.portno_start
498             self.blockerbport = self.blockerb.portno_start
499             start_peer(self.blockerm)
500             start_peer(self.blockerb)
501             start_peer(self.mapperd)
502             start_peer(self.vlmcd)
503         except Exception as e:
504             print e
505             stop_peer(self.vlmcd)
506             stop_peer(self.mapperd)
507             stop_peer(self.blockerb)
508             stop_peer(self.blockerm)
509             super(VlmcdTest, self).tearDown()
510             raise e
511
512     def tearDown(self):
513         stop_peer(self.vlmcd)
514         stop_peer(self.mapperd)
515         stop_peer(self.blockerb)
516         stop_peer(self.blockerm)
517         super(VlmcdTest, self).tearDown()
518
519     def test_open(self):
520         volume = "myvolume"
521         volsize = 10*1024*1024
522         self.send_and_evaluate_open(self.vlmcdport, volume, expected=False)
523         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
524                 clone_size=volsize)
525         self.send_and_evaluate_open(self.vlmcdport, volume)
526         self.send_and_evaluate_open(self.vlmcdport, volume)
527
528     def test_close(self):
529         volume = "myvolume"
530         volsize = 10*1024*1024
531         self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
532         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
533                 clone_size=volsize)
534         self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
535         self.send_and_evaluate_open(self.vlmcdport, volume)
536         self.send_and_evaluate_close(self.vlmcdport, volume)
537         self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
538
539     def test_info(self):
540         volume = "myvolume"
541         volsize = 10*1024*1024
542         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
543                 clone_size=volsize)
544         xinfo = self.get_reply_info(volsize)
545         self.send_and_evaluate_info(self.vlmcdport, volume, expected_data=xinfo)
546
547     def test_write_read(self):
548         datalen = 1024
549         data = get_random_string(datalen, 16)
550         volume = "myvolume"
551         volsize = 10*1024*1024
552
553         self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
554                 expected=False)
555         self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
556                 expected=False)
557         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
558                 clone_size=volsize)
559         self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
560                 serviced=datalen)
561         self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
562                 expected_data=data)
563
564     def test_clone_snapshot(self):
565         volume = "myvolume"
566         snap = "mysnapshot"
567         snap2 = "mysnapshot2"
568         snap2 = "mysnapshot3"
569         clone1 = "myclone1"
570         clone2 = "myclone2"
571
572         volsize = 100*1024*1024*1024
573         clone2size = 200*1024*1024*1024
574         offset = 90*1024*1024*1024
575         size = 10*1024*1024
576
577         zeros = '\x00' * size
578         data = get_random_string(size, 16)
579         data2 = get_random_string(size, 16)
580
581         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
582                 clone_size=volsize)
583         self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
584                 offset=offset, expected_data=zeros)
585
586         self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
587         self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
588                 offset=offset, expected_data=zeros)
589         self.send_and_evaluate_write(self.vlmcdport, volume, data=data, offset=offset,
590                 serviced=size)
591         self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
592                 offset=offset, expected_data=zeros)
593         self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
594                 offset=offset, expected_data=data)
595
596         self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap2)
597         self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
598                 offset=offset, expected_data=data)
599         self.send_and_evaluate_clone(self.mapperdport, snap2, clone=clone1,
600                 clone_size=clone2size)
601         self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
602                 offset=offset, expected_data=data)
603         self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
604                 offset=volsize+offset, expected_data=zeros)
605
606         self.send_and_evaluate_write(self.vlmcdport, clone1, data=data2,
607                                 offset=offset, serviced=size)
608         self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
609                 offset=offset, expected_data=data2)
610         self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
611                 offset=offset, expected_data=data)
612         self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
613                 offset=offset, expected_data=data)
614         self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
615                 offset=offset, expected_data=zeros)
616
617     def test_info2(self):
618         volume = "myvolume"
619         volsize = 10*1024*1024
620         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
621                 clone_size=volsize)
622         xinfo = self.get_reply_info(volsize)
623         reqs = Set([])
624         reqs.add(self.send_info(self.vlmcdport, volume))
625         reqs.add(self.send_info(self.vlmcdport, volume))
626         while len(reqs) > 0:
627             req = self.xseg.wait_requests(reqs)
628             self.evaluate_req(req, data=xinfo)
629             reqs.remove(req)
630             self.assertTrue(req.put())
631
632     def test_flush(self):
633         datalen = 1024
634         data = get_random_string(datalen, 16)
635         volume = "myvolume"
636         volsize = 10*1024*1024
637
638         #This may seems weird, but actually vlmcd flush, only guarantees that
639         #there are no pending operation the volume. On a volume that does not
640         #exists, this is always true, so this should succeed.
641         self.send_and_evaluate_write(self.vlmcdport, volume, data="",
642                 flags=XF_FLUSH, expected=True)
643         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
644                 clone_size=volsize)
645         self.send_and_evaluate_write(self.vlmcdport, volume, data="",
646                 flags=XF_FLUSH)
647         self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
648                 serviced=datalen)
649         self.send_and_evaluate_write(self.vlmcdport, volume, data="",
650                 flags=XF_FLUSH)
651
652     def test_flush2(self):
653         volume = "myvolume"
654         volsize = 10*1024*1024
655         datalen = 1024
656         data = get_random_string(datalen, 16)
657
658         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
659                 clone_size=volsize)
660         xinfo = self.get_reply_info(volsize)
661         reqs = Set([])
662         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
663         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
664         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
665         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
666         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
667         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
668         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
669         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
670         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
671         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
672         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
673         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
674         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
675         reqs.add(self.send_write(self.vlmcdport, volume, data="", flags=XF_FLUSH))
676         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
677         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
678         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
679         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
680         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
681         reqs.add(self.send_write(self.vlmcdport, volume, data=data))
682         while len(reqs) > 0:
683             req = self.xseg.wait_requests(reqs)
684             self.evaluate_req(req)
685             reqs.remove(req)
686             self.assertTrue(req.put())
687
688     def test_hash(self):
689         blocksize = self.blocksize
690         volume = "myvolume"
691         volume2 = "myvolume2"
692         snap = "snapshot"
693         clone = "clone"
694         volsize = 10*1024*1024
695         size = 512*1024
696         epoch = 1
697         offset = 0
698         data = get_random_string(size, 16)
699
700         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
701                 clone_size=volsize)
702         self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
703                                 offset=offset, serviced=size)
704
705         self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
706
707         self.send_and_evaluate_hash(self.mapperdport, volume, size=volsize,
708                 expected=False)
709         req = self.send_hash(self.mapperdport, snap, size=volsize)
710         req.wait()
711         self.assertTrue(req.success())
712         xreply = req.get_data(xseg_reply_hash).contents
713         hash_map = ctypes.string_at(xreply.target, xreply.targetlen)
714         req.put()
715
716         req = self.send_map_read(self.mapperdport, snap, offset=0,
717                 size=volsize)
718         req.wait()
719         self.assertTrue(req.success())
720         xreply = req.get_data(xseg_reply_map).contents
721         blocks = self.get_list_of_hashes(xreply, from_segment=True)
722         req.put()
723         h = []
724         for b in blocks:
725             if (b == sha256('').hexdigest()):
726                 h.append(unhexlify(b))
727                 continue
728             req = self.send_hash(self.blockerbport, b, size=blocksize)
729             req.wait()
730             self.assertTrue(req.success())
731             xreply = req.get_data(xseg_reply_hash).contents
732             h.append(unhexlify(ctypes.string_at(xreply.target, xreply.targetlen)))
733             req.put()
734
735         mh = hexlify(merkle_hash(h))
736         self.assertEqual(hash_map, mh)
737
738         self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
739                 clone_size=volsize * 2, expected=False)
740         self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
741                 clone_size=volsize * 2, cont_addr=True)
742         self.send_and_evaluate_read(self.vlmcdport, volume2, size=size,
743                 offset=offset, expected_data=data)
744         self.send_and_evaluate_read(self.vlmcdport, volume2, size=volsize - size,
745                 offset=offset + size, expected_data='\x00' * (volsize - size))
746
747
748 class MapperdTest(XsegTest):
749     bfiled_args = {
750             'role': 'mappertest-blockerb',
751             'spec': XsegTest.spec,
752             'nr_ops': 16,
753             'archip_dir': '/tmp/bfiledtest/',
754             'prefix': 'archip_',
755             'portno_start': 0,
756             'portno_end': 0,
757             'daemon': True,
758             'log_level': 3,
759             }
760     mfiled_args = {
761             'role': 'mappertest-blockerm',
762             'spec': XsegTest.spec,
763             'nr_ops': 16,
764             'archip_dir': '/tmp/mfiledtest/',
765             'prefix': 'archip_',
766             'portno_start': 1,
767             'portno_end': 1,
768             'daemon': True,
769             'log_level': 3,
770             }
771     mapperd_args = {
772             'role': 'mappertest-mapper',
773             'spec': XsegTest.spec,
774             'nr_ops': 16,
775             'portno_start': 2,
776             'portno_end': 2,
777             'daemon': True,
778             'log_level': 3,
779             'blockerb_port': 0,
780             'blockerm_port': 1,
781             }
782     blocksize = 4*1024*1024
783
784     def setUp(self):
785         super(MapperdTest, self).setUp()
786         try:
787             self.blockerm = self.get_filed(self.mfiled_args, clean=True)
788             self.blockerb = self.get_filed(self.bfiled_args, clean=True)
789             self.mapperd = self.get_mapperd(self.mapperd_args)
790             self.mapperdport = self.mapperd.portno_start
791             start_peer(self.blockerm)
792             start_peer(self.blockerb)
793             start_peer(self.mapperd)
794         except Exception as e:
795             print e
796             stop_peer(self.mapperd)
797             stop_peer(self.blockerb)
798             stop_peer(self.blockerm)
799             super(MapperdTest, self).tearDown()
800             raise e
801
802     def tearDown(self):
803         stop_peer(self.mapperd)
804         stop_peer(self.blockerb)
805         stop_peer(self.blockerm)
806         super(MapperdTest, self).tearDown()
807
808     def test_create(self):
809         volume = "myvolume"
810         volsize = 10*1024*1024
811         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
812                 clone_size=0, expected=False)
813         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
814                 clone_size=volsize)
815         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
816                 clone_size=volsize, expected=False)
817
818     def test_delete(self):
819         volume = "myvolume"
820         volsize = 10*1024*1024
821         offset = 0
822         size = 10
823         epoch = 2
824
825         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
826                 clone_size=0, expected=False)
827         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
828                 clone_size=volsize)
829         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
830                 clone_size=volsize, expected=False)
831         self.send_and_evaluate_delete(self.mapperdport, volume)
832         self.send_and_evaluate_delete(self.mapperdport, volume, expected=False)
833         self.send_and_evaluate_map_write(self.mapperdport, volume,
834                 offset=offset, size=size, expected=False)
835         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
836                 clone_size=volsize)
837
838         ret = self.get_copy_map_reply(volume, offset, size, epoch)
839
840         self.send_and_evaluate_map_write(self.mapperdport, volume,
841                 expected_data=ret, offset=offset, size=size)
842
843     def test_clone_snapshot(self):
844         volume = "myvolume"
845         snap = "mysnapshot"
846         snap2 = "mysnapshot2"
847         snap2 = "mysnapshot3"
848         clone1 = "myclone1"
849         clone2 = "myclone2"
850         volsize = 100*1024*1024*1024
851         clone2size = 200*1024*1024*1024
852         offset = 90*1024*1024*1024
853         size = 10*1024*1024
854         offset = 0
855         size = volsize
856         epoch = 2
857
858         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
859                 clone_size=volsize)
860         self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap)
861         self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap,
862                 expected=False)
863         xinfo = self.get_reply_info(volsize)
864         self.send_and_evaluate_info(self.mapperdport, snap, expected_data=xinfo)
865         ret = self.get_zero_map_reply(offset, size)
866         self.send_and_evaluate_map_read(self.mapperdport, volume,
867                 expected_data=ret, offset=offset, size=size)
868         self.send_and_evaluate_map_read(self.mapperdport, snap,
869                 expected_data=ret, offset=offset, size=size)
870
871         ret = self.get_copy_map_reply(volume, offset, size, epoch)
872         self.send_and_evaluate_map_write(self.mapperdport, volume,
873                 expected_data=ret, offset=offset, size=size)
874
875         stop_peer(self.mapperd)
876         start_peer(self.mapperd)
877         self.send_and_evaluate_map_read(self.mapperdport, volume,
878                 expected_data=ret, offset=offset, size=size)
879
880         self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone1)
881         xinfo = self.get_reply_info(volsize)
882         self.send_and_evaluate_info(self.mapperdport, clone1, expected_data=xinfo)
883         self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
884                 clone_size=2, expected=False)
885         self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
886                 clone_size=clone2size)
887         xinfo = self.get_reply_info(clone2size)
888         self.send_and_evaluate_info(self.mapperdport, clone2, expected_data=xinfo)
889
890     def test_info(self):
891         volume = "myvolume"
892         volsize = 10*1024*1024
893         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
894                 clone_size=volsize)
895         xinfo = self.get_reply_info(volsize)
896         self.send_and_evaluate_info(self.mapperdport, volume, expected_data=xinfo)
897
898     def test_info2(self):
899         volume = "myvolume"
900         volsize = 10*1024*1024
901         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
902                 clone_size=volsize)
903         xinfo = self.get_reply_info(volsize)
904         reqs = Set([])
905         reqs.add(self.send_info(self.mapperdport, volume))
906         reqs.add(self.send_info(self.mapperdport, volume))
907         while len(reqs) > 0:
908             req = self.xseg.wait_requests(reqs)
909             self.evaluate_req(req, data=xinfo)
910             reqs.remove(req)
911             self.assertTrue(req.put())
912
913     def test_open(self):
914         volume = "myvolume"
915         volsize = 10*1024*1024
916         self.send_and_evaluate_open(self.mapperdport, volume, expected=False)
917         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
918                 clone_size=volsize)
919         self.send_and_evaluate_open(self.mapperdport, volume)
920         self.send_and_evaluate_open(self.mapperdport, volume)
921
922     def test_open2(self):
923         volume = "myvolume"
924         volsize = 10*1024*1024
925         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
926                 clone_size=volsize)
927         reqs = Set([])
928         reqs.add(self.send_open(self.mapperdport, volume))
929         reqs.add(self.send_open(self.mapperdport, volume))
930         while len(reqs) > 0:
931             req = self.xseg.wait_requests(reqs)
932             self.evaluate_req(req)
933             reqs.remove(req)
934             self.assertTrue(req.put())
935
936     def test_close(self):
937         volume = "myvolume"
938         volsize = 10*1024*1024
939         self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
940         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
941                 clone_size=volsize)
942         self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
943         self.send_and_evaluate_open(self.mapperdport, volume)
944         self.send_and_evaluate_close(self.mapperdport, volume)
945         self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
946
947     def test_mapr(self):
948         volume = "myvolume"
949         volsize = 10*1024*1024
950         offset = 0
951         size = volsize
952         ret = MapperdTest.get_zero_map_reply(offset, size)
953         self.send_and_evaluate_map_read(self.mapperdport, volume,
954                 offset=offset, size=size, expected=False)
955         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
956                 clone_size=volsize)
957         self.send_and_evaluate_map_read(self.mapperdport, volume,
958                 expected_data=ret, offset=offset, size=size)
959         offset = volsize - 1
960         self.send_and_evaluate_map_read(self.mapperdport, volume,
961                 offset=offset, size=size, expected=False)
962         offset = volsize + 1
963         self.send_and_evaluate_map_read(self.mapperdport, volume,
964                 offset=offset, size=size, expected=False)
965
966     def test_mapr2(self):
967         volume = "myvolume"
968         volsize = 10*1024*1024
969         offset = 0
970         size = volsize
971
972         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
973                 clone_size=volsize)
974         reqs = Set([])
975         reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
976             size=size))
977         reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
978             size=size))
979         ret = MapperdTest.get_zero_map_reply(offset, size)
980         while len(reqs) > 0:
981             req = self.xseg.wait_requests(reqs)
982             self.evaluate_req(req, data=ret)
983             reqs.remove(req)
984             self.assertTrue(req.put())
985
986     def test_mapw(self):
987         blocksize = self.blocksize
988         volume = "myvolume"
989         volsize = 100*1024*1024*1024
990         offset = 90*1024*1024*1024 - 2
991         size = 512*1024
992         epoch = 1
993
994         ret = self.get_copy_map_reply(volume, offset, size, epoch)
995
996         self.send_and_evaluate_map_write(self.mapperdport, volume,
997                 offset=offset, size=size, expected=False)
998         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
999                 clone_size=volsize)
1000         self.send_and_evaluate_map_write(self.mapperdport, volume,
1001                 expected_data=ret, offset=offset, size=size)
1002         self.send_and_evaluate_map_read(self.mapperdport, volume,
1003                 expected_data = ret, offset=offset, size=size)
1004         stop_peer(self.mapperd)
1005         start_peer(self.mapperd)
1006         self.send_and_evaluate_map_read(self.mapperdport, volume,
1007                 expected_data=ret, offset=offset, size=size)
1008         self.send_and_evaluate_open(self.mapperdport, volume)
1009         offset = 101*1024*1024*1024
1010         self.send_and_evaluate_map_write(self.mapperdport, volume,
1011                 offset=offset, size=size, expected=False)
1012         offset = 100*1024*1024*1024 - 1
1013         self.send_and_evaluate_map_write(self.mapperdport, volume,
1014                 offset=offset, size=size, expected=False)
1015
1016     def test_mapw2(self):
1017         blocksize = self.blocksize
1018         volume = "myvolume"
1019         volsize = 100*1024*1024*1024
1020         offset = 90*1024*1024*1024 - 2
1021         size = 512*1024
1022         epoch = 1
1023
1024         ret = self.get_copy_map_reply(volume, offset, size, epoch)
1025
1026         self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
1027                 clone_size=volsize)
1028
1029         reqs = Set([])
1030         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1031             size=size))
1032         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1033             size=size))
1034         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1035             size=size))
1036         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1037             size=size))
1038         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1039             size=size))
1040         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1041             size=size))
1042         reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1043             size=size))
1044         while len(reqs) > 0:
1045             req = self.xseg.wait_requests(reqs)
1046             self.evaluate_req(req, data=ret)
1047             reqs.remove(req)
1048             self.assertTrue(req.put())
1049
1050 class BlockerTest(object):
1051     def test_write_read(self):
1052         datalen = 1024
1053         data = get_random_string(datalen, 16)
1054         target = "mytarget"
1055         xinfo = self.get_reply_info(datalen)
1056
1057         self.send_and_evaluate_write(self.blockerport, target, data=data,
1058                 serviced=datalen)
1059         self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1060                 expected_data=data)
1061         self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1062         stop_peer(self.blocker)
1063         start_peer(self.blocker)
1064         self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1065                 expected_data=data)
1066         self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1067
1068     def test_info(self):
1069         datalen = 1024
1070         data = get_random_string(datalen, 16)
1071         target = "mytarget"
1072         self.send_and_evaluate_write(self.blockerport, target, data=data,
1073                 serviced=datalen)
1074         xinfo = self.get_reply_info(datalen)
1075         self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1076
1077     def test_copy(self):
1078         datalen = 1024
1079         data = get_random_string(datalen, 16)
1080         target = "mytarget"
1081         copy_target = "copy_target"
1082
1083         self.send_and_evaluate_write(self.blockerport, target, data=data,
1084                 serviced=datalen)
1085         self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1086                 expected_data=data, serviced=datalen)
1087         self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
1088                 size=datalen, serviced=datalen)
1089         self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
1090                 size=datalen+1, serviced=datalen+1)
1091         self.send_and_evaluate_read(self.blockerport, copy_target, size=datalen,
1092                 expected_data=data)
1093
1094
1095     def test_delete(self):
1096         datalen = 1024
1097         data = get_random_string(datalen, 16)
1098         target = "mytarget"
1099         self.send_and_evaluate_delete(self.blockerport, target, False)
1100         self.send_and_evaluate_write(self.blockerport, target, data=data)
1101         self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1102                 expected_data=data)
1103         self.send_and_evaluate_delete(self.blockerport, target, True)
1104         data = '\x00' * datalen
1105         self.send_and_evaluate_read(self.blockerport, target, size=datalen, expected_data=data)
1106
1107     def test_hash(self):
1108         datalen = 1024
1109         data = '\x00'*datalen
1110         target = "target_zeros"
1111
1112
1113         self.send_and_evaluate_write(self.blockerport, target, data=data,
1114                 serviced=datalen)
1115         ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1116         self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1117                 expected_data=ret, serviced=datalen)
1118
1119         target = "mytarget"
1120         data = get_random_string(datalen, 16)
1121         self.send_and_evaluate_write(self.blockerport, target, data=data,
1122                 serviced=datalen)
1123         ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1124         self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1125                 expected_data=ret, serviced=datalen)
1126         self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1127                 expected_data=ret, serviced=datalen)
1128         self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1129                 expected_data=ret, serviced=datalen)
1130
1131     def test_locking(self):
1132         target = "mytarget"
1133         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1134         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1135         self.send_and_evaluate_release(self.blockerport, target, expected=True)
1136         self.send_and_evaluate_release(self.blockerport, target, expected=False)
1137         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1138         stop_peer(self.blocker)
1139         start_peer(self.blocker)
1140         self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1141         self.send_and_evaluate_release(self.blockerport, target, expected=False)
1142         self.send_and_evaluate_release(self.blockerport, target, force=True,
1143                 expected=True)
1144
1145
1146 class FiledTest(BlockerTest, XsegTest):
1147     filed_args = {
1148             'role': 'testfiled',
1149             'spec': XsegTest.spec,
1150             'nr_ops': 16,
1151             'archip_dir': '/tmp/filedtest/',
1152             'prefix': 'archip_',
1153             'portno_start': 0,
1154             'portno_end': 0,
1155             'daemon': True,
1156             'log_level': 3,
1157             }
1158
1159     def setUp(self):
1160         super(FiledTest, self).setUp()
1161         try:
1162             self.blocker = self.get_filed(self.filed_args, clean=True)
1163             self.blockerport = self.blocker.portno_start
1164             start_peer(self.blocker)
1165         except Exception as e:
1166             super(FiledTest, self).tearDown()
1167             raise e
1168
1169     def tearDown(self):
1170         stop_peer(self.blocker)
1171         super(FiledTest, self).tearDown()
1172
1173     def test_locking(self):
1174         target = "mytarget"
1175         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1176         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1177         self.send_and_evaluate_release(self.blockerport, target, expected=True)
1178         self.send_and_evaluate_release(self.blockerport, target, expected=False)
1179         self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1180         stop_peer(self.blocker)
1181         new_filed_args = copy(self.filed_args)
1182         new_filed_args['unique_str'] = 'ThisisSparta'
1183         self.blocker = Filed(**new_filed_args)
1184         start_peer(self.blocker)
1185         self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1186         self.send_and_evaluate_release(self.blockerport, target, expected=False)
1187         self.send_and_evaluate_release(self.blockerport, target, force=True,
1188                 expected=True)
1189
1190 class SosdTest(BlockerTest, XsegTest):
1191     filed_args = {
1192             'role': 'testsosd',
1193             'spec': XsegTest.spec,
1194             'nr_ops': 16,
1195             'portno_start': 0,
1196             'portno_end': 0,
1197             'daemon': True,
1198             'log_level': 3,
1199             'pool': 'test_sosd',
1200             'nr_threads': 3,
1201             }
1202
1203     def setUp(self):
1204         super(SosdTest, self).setUp()
1205         try:
1206             self.blocker = self.get_sosd(self.filed_args, clean=True)
1207             self.blockerport = self.blocker.portno_start
1208             start_peer(self.blocker)
1209         except Exception as e:
1210             super(SosdTest, self).tearDown()
1211             raise e
1212
1213     def tearDown(self):
1214         stop_peer(self.blocker)
1215         super(SosdTest, self).tearDown()
1216
1217 if __name__=='__main__':
1218     init()
1219     unittest.main()