Statistics
| Branch: | Tag: | Revision:

root / xseg / tools / qa / tests.py @ 6e1bf96f

History | View | Annotate | Download (46.8 kB)

1
# Copyright 2013 GRNET S.A. All rights reserved.
2
#
3
# Redistribution and use in source and binary forms, with or
4
# without modification, are permitted provided that the following
5
# conditions are met:
6
#
7
#   1. Redistributions of source code must retain the above
8
#      copyright notice, this list of conditions and the following
9
#      disclaimer.
10
#
11
#   2. Redistributions in binary form must reproduce the above
12
#      copyright notice, this list of conditions and the following
13
#      disclaimer in the documentation and/or other materials
14
#      provided with the distribution.
15
#
16
# THIS SOFTWARE IS PROVIDED BY GRNET S.A. ``AS IS'' AND ANY EXPRESS
17
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
19
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL GRNET S.A OR
20
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
21
# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
22
# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
23
# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
24
# AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
25
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
26
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27
# POSSIBILITY OF SUCH DAMAGE.
28
#
29
# The views and conclusions contained in the software and
30
# documentation are those of the authors and should not be
31
# interpreted as representing official policies, either expressed
32
# or implied, of GRNET S.A.
33

    
34
import archipelago
35
from archipelago.common import Xseg_ctx, Request, Filed, Mapperd, Vlmcd, Sosd, \
36
        Error, Segment
37
from archipelago.archipelago import start_peer, stop_peer
38
import random as rnd
39
import unittest2 as unittest
40
from xseg.xprotocol import *
41
from xseg.xseg_api import *
42
import ctypes
43
import os
44
from copy import copy
45
from sets import Set
46
from binascii import hexlify, unhexlify
47
from hashlib import sha256
48

    
49
def get_random_string(length=64, repeat=16):
50
    nr_repeats = length//repeat
51

    
52
    l = []
53
    for i in range(repeat):
54
        l.append(chr(ord('a') + rnd.randint(0,25)))
55
    random_string = ''.join(l)
56

    
57
    l = []
58
    for i in range(nr_repeats):
59
        l.append(random_string)
60
    rem = length % repeat
61
    l.append(random_string[0:rem])
62

    
63
    return ''.join(l)
64

    
65
def recursive_remove(path):
66
    for root, dirs, files in os.walk(path, topdown=False):
67
        for name in files:
68
            os.remove(os.path.join(root, name))
69
        for name in dirs:
70
            os.rmdir(os.path.join(root, name))
71

    
72
def merkle_hash(hashes):
73
    if len(hashes) == 0:
74
        return sha256('').digest()
75
    if len(hashes) == 1:
76
        return hashes[0]
77

    
78
    s = 2
79
    while s < len(hashes):
80
        s = s * 2
81
    hashes += [('\x00' * len(hashes[0]))] * (s - len(hashes))
82
    while len(hashes) > 1 :
83
        hashes = [sha256(hashes[i] + hashes[i + 1]).digest() for i in range (0, len(hashes), 2)]
84
    return hashes[0]
85
    
86

    
87
def init():
88
    rnd.seed()
89
#    archipelago.common.BIN_DIR=os.path.join(os.getcwd(), '../../peers/user/')
90
    archipelago.common.LOGS_PATH=os.path.join(os.getcwd(), 'logs')
91
    archipelago.common.PIDFILE_PATH=os.path.join(os.getcwd(), 'pids')
92
    if not os.path.isdir(archipelago.common.LOGS_PATH):
93
        os.makedirs(archipelago.common.LOGS_PATH)
94
    if not os.path.isdir(archipelago.common.PIDFILE_PATH):
95
        os.makedirs(archipelago.common.PIDFILE_PATH)
96

    
97
    recursive_remove(archipelago.common.LOGS_PATH)
98

    
99
class XsegTest(unittest.TestCase):
100
    spec = "posix:testsegment:8:16:256:12".encode()
101
    blocksize = 4*1024*1024
102
    segment = None
103

    
104
    def setUp(self):
105
        self.segment = Segment('posix', 'testsegment', 8, 16, 256, 12)
106
        try:
107
            self.segment.create()
108
        except Exception as e:
109
            self.segment.destroy()
110
            self.segment.create()
111
        self.xseg = Xseg_ctx(self.segment)
112

    
113
    def tearDown(self):
114
        if self.xseg:
115
            self.xseg.shutdown()
116
        if self.segment:
117
            self.segment.destroy()
118

    
119
    @staticmethod
120
    def get_reply_info(size):
121
        xinfo = xseg_reply_info()
122
        xinfo.size = size
123
        return xinfo
124

    
125
    @staticmethod
126
    def get_hash_reply(hashstring):
127
        xhash = xseg_reply_hash()
128
        xhash.target = hashstring
129
        xhash.targetlen = len(hashstring)
130
        return xhash
131

    
132
    @staticmethod
133
    def get_object_name(volume, epoch, index):
134
        epoch_64 = ctypes.c_uint64(epoch)
135
        index_64 = ctypes.c_uint64(index)
136
        epoch_64_char = ctypes.cast(ctypes.addressof(epoch_64), ctypes.c_char_p)
137
        index_64_char = ctypes.cast(ctypes.addressof(index_64), ctypes.c_char_p)
138
        epoch_64_str = ctypes.string_at(epoch_64_char, ctypes.sizeof(ctypes.c_uint64))
139
        index_64_str = ctypes.string_at(index_64_char, ctypes.sizeof(ctypes.c_uint64))
140
        epoch_hex = hexlify(epoch_64_str)
141
        index_hex = hexlify(index_64_str)
142
        return "archip_" + volume + "_" + epoch_hex + "_" + index_hex
143

    
144
    @staticmethod
145
    def get_map_reply(offset, size):
146
        blocksize = XsegTest.blocksize
147
        ret = xseg_reply_map()
148
        cnt = (offset+size)//blocksize - offset//blocksize
149
        if (offset+size) % blocksize > 0 :
150
            cnt += 1
151
        ret.cnt = cnt
152
        SegsArray = xseg_reply_map_scatterlist * cnt
153
        segs = SegsArray()
154
        rem_size = size
155
        offset = offset % blocksize
156
        for i in range(0, cnt):
157
            segs[i].offset = offset
158
            segs[i].size = blocksize - offset
159
            if segs[i].size > rem_size:
160
                segs[i].size = rem_size
161
            offset = 0
162
            rem_size -= segs[i].size
163
            if rem_size < 0 :
164
                raise Error("Calculation error")
165
        ret.segs = segs
166

    
167
        return ret
168

    
169
    @staticmethod
170
    def get_list_of_hashes(xreply, from_segment=False):
171
        hashes = []
172
        cnt = xreply.cnt
173
        segs = xreply.segs
174
        if from_segment:
175
            SegsArray = xseg_reply_map_scatterlist * cnt
176
            array = SegsArray.from_address(ctypes.addressof(segs))
177
            segs = array
178
        for i in range(0, cnt):
179
            hashes.append(ctypes.string_at(segs[i].target, segs[i].targetlen))
180
        return hashes
181

    
182
    @staticmethod
183
    def get_zero_map_reply(offset, size):
184
        ret = XsegTest.get_map_reply(offset, size);
185
        cnt = ret.cnt
186
        for i in range(0, cnt):
187
            ret.segs[i].target = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
188
            ret.segs[i].targetlen = len(ret.segs[i].target)
189
        return ret
190

    
191
    @staticmethod
192
    def get_copy_map_reply(volume, offset, size, epoch):
193
        blocksize = XsegTest.blocksize
194
        objidx_start = offset//blocksize
195
        ret = XsegTest.get_map_reply(offset, size);
196
        cnt = ret.cnt
197
        for i in range(0, cnt):
198
            ret.segs[i].target = MapperdTest.get_object_name(volume, epoch,
199
                    objidx_start+i)
200
            ret.segs[i].targetlen = len(ret.segs[i].target)
201
        return ret
202

    
203
    def get_req(self, op, dst, target, data=None, size=0, offset=0, datalen=0,
204
            flags=0):
205
        return Request(self.xseg, dst, target, data=data, size=size,
206
                offset=offset, datalen=datalen, flags=flags, op=op)
207

    
208
    def assert_equal_xseg(self, req, expected_data):
209
        if isinstance(expected_data, xseg_reply_info):
210
            datasize = ctypes.sizeof(expected_data)
211
            self.assertEqual(datasize, req.get_datalen())
212
            data = req.get_data(type(expected_data)).contents
213
            self.assertEqual(data.size, expected_data.size)
214
        elif isinstance(expected_data, xseg_reply_map):
215
            #since xseg_reply_map uses a flexible array for the
216
            #xseg_reply_map_scatterlist reply, we calculate the size of the
217
            #reply in the segment, by subtracting the size of the pointer to
218
            #the array, in the python object
219
            datasize = ctypes.sizeof(expected_data)
220
            datasize -= ctypes.sizeof(expected_data.segs)
221
            datasize += expected_data.cnt*ctypes.sizeof(xseg_reply_map_scatterlist)
222
            self.assertEqual(datasize, req.get_datalen())
223
            data = req.get_data(type(expected_data)).contents
224
            cnt = data.cnt
225
            self.assertEqual(data.cnt, expected_data.cnt)
226
            segs = data.segs
227
            SegsArray = xseg_reply_map_scatterlist * cnt
228
            array = SegsArray.from_address(ctypes.addressof(segs))
229
            expected_array = expected_data.segs
230
            for i in range(0, cnt):
231
                t = ctypes.string_at(array[i].target, array[i].targetlen)
232
                self.assertEqual(array[i].targetlen, expected_array[i].targetlen)
233
                self.assertEqual(t, expected_array[i].target)
234
                self.assertEqual(array[i].offset, expected_array[i].offset)
235
                self.assertEqual(array[i].size, expected_array[i].size)
236
        elif isinstance(expected_data, xseg_reply_hash):
237
            datasize = ctypes.sizeof(expected_data)
238
            self.assertEqual(datasize, req.get_datalen())
239
            data = req.get_data(type(expected_data)).contents
240
            self.assertEqual(data.targetlen, expected_data.targetlen)
241
            t = ctypes.string_at(data.target, data.targetlen)
242
            et = ctypes.string_at(expected_data.target, expected_data.targetlen)
243
            self.assertEqual(t, et)
244
        else:
245
            raise Error("Unknown data type")
246

    
247
    def evaluate_req(self, req, success=True, serviced=None, data=None):
248
        if not success:
249
            self.assertFalse(req.success())
250
            return
251

    
252
        self.assertTrue(req.success())
253
        if serviced is not None:
254
            self.assertEqual(req.get_serviced(), serviced)
255
        if data is not None:
256
            if isinstance(data, basestring):
257
                datalen = len(data)
258
                self.assertEqual(datalen, req.get_datalen())
259
                self.assertEqual(data, ctypes.string_at(req.get_data(None), datalen))
260
            else:
261
                self.assert_equal_xseg(req, data)
262

    
263
    def evaluate(send_func):
264
        def send_and_evaluate(self, dst, target, expected=True, serviced=None,
265
                expected_data=None, **kwargs):
266
            req = send_func(self, dst, target, **kwargs)
267
            req.wait()
268
            self.evaluate_req(req, success=expected, serviced=serviced,
269
                    data=expected_data)
270
            self.assertTrue(req.put())
271
        return send_and_evaluate
272

    
273
    def send_write(self, dst, target, data=None, offset=0, datalen=0, flags=0):
274
        #assert datalen >= size
275
#        req = self.get_req(X_WRITE, dst, target, data, size=size, offset=offset, datalen=datalen)
276
        req = Request.get_write_request(self.xseg, dst, target, data=data,
277
                offset=offset, datalen=datalen, flags=flags)
278
        req.submit()
279
        return req
280

    
281
    send_and_evaluate_write = evaluate(send_write)
282

    
283
    def send_read(self, dst, target, size=0, datalen=0, offset=0):
284
        #assert datalen >= size
285
#        req = self.get_req(X_READ, dst, target, data=None, size=size, offset=offset, datalen=datalen)
286
        req = Request.get_read_request(self.xseg, dst, target, size=size,
287
                offset=offset, datalen=datalen)
288
        req.submit()
289
        return req
290

    
291
    send_and_evaluate_read = evaluate(send_read)
292

    
293
    def send_info(self, dst, target):
294
        #req = self.get_req(X_INFO, dst, target, data=None, size=0)
295
        req = Request.get_info_request(self.xseg, dst, target)
296
        req.submit()
297
        return req
298

    
299
    send_and_evaluate_info = evaluate(send_info)
300

    
301
    def send_copy(self, dst, src_target, dst_target=None, size=0, offset=0):
302
        #datalen = ctypes.sizeof(xseg_request_copy)
303
        #xcopy = xseg_request_copy()
304
        #xcopy.target = src_target
305
        #xcopy.targetlen = len(src_target)
306
#        req = self.get_req(X_COPY, dst, dst_target, data=xcopy, datalen=datalen,
307
#                offset=offset, size=size)
308
        req = Request.get_copy_request(self.xseg, dst, src_target,
309
                copy_target=dst_target, size=size, offset=offset)
310
        req.submit()
311
        return req
312

    
313
    send_and_evaluate_copy = evaluate(send_copy)
314

    
315
    def send_acquire(self, dst, target):
316
        #req = self.get_req(X_ACQUIRE, dst, target, flags=XF_NOSYNC)
317
        req = Request.get_acquire_request(self.xseg, dst, target)
318
        req.submit()
319
        return req
320

    
321
    send_and_evaluate_acquire = evaluate(send_acquire)
322

    
323
    def send_release(self, dst, target, force=False):
324
        #req_flags = XF_NOSYNC
325
        #if force:
326
            #req_flags |= XF_FORCE
327
        #req = self.get_req(X_RELEASE, dst, target, size=0, flags=req_flags)
328
        req = Request.get_release_request(self.xseg, dst, target, force)
329
        req.submit()
330
        return req
331

    
332
    send_and_evaluate_release = evaluate(send_release)
333

    
334
    def send_delete(self, dst, target):
335
        #req = self.get_req(X_DELETE, dst, target)
336
        req = Request.get_delete_request(self.xseg, dst, target)
337
        req.submit()
338
        return req
339

    
340
    send_and_evaluate_delete = evaluate(send_delete)
341

    
342
    def send_clone(self, dst, src_target, clone=None, clone_size=0,
343
            cont_addr=False):
344
        #xclone = xseg_request_clone()
345
        #xclone.target = src_target
346
        #xclone.targetlen = len(src_target)
347
        #xclone.size = clone_size
348

    
349
        #req = self.get_req(X_CLONE, dst, clone, data=xclone,
350
                #datalen=ctypes.sizeof(xclone))
351
        req = Request.get_clone_request(self.xseg, dst, src_target,
352
                clone=clone, clone_size=clone_size, cont_addr=cont_addr)
353
        req.submit()
354
        return req
355

    
356
    send_and_evaluate_clone = evaluate(send_clone)
357

    
358
    def send_snapshot(self, dst, src_target, snap=None):
359
        #xsnapshot = xseg_request_snapshot()
360
        #xsnapshot.target = snap
361
        #xsnapshot.targetlen = len(snap)
362

    
363
        #req = self.get_req(X_SNAPSHOT, dst, src_target, data=xsnapshot,
364
                #datalen=ctypes.sizeof(xsnapshot))
365
        req = Request.get_snapshot_request(self.xseg, dst, src_target, snap=snap)
366
        req.submit()
367
        return req
368

    
369
    send_and_evaluate_snapshot = evaluate(send_snapshot)
370

    
371
    def send_open(self, dst, target):
372
        #req = self.get_req(X_OPEN, dst, target)
373
        req = Request.get_open_request(self.xseg, dst, target)
374
        req.submit()
375
        return req
376

    
377
    send_and_evaluate_open = evaluate(send_open)
378

    
379
    def send_close(self, dst, target):
380
        #req = self.get_req(X_CLOSE, dst, target)
381
        req = Request.get_close_request(self.xseg, dst, target)
382
        req.submit()
383
        return req
384

    
385
    send_and_evaluate_close = evaluate(send_close)
386

    
387
    def send_map_read(self, dst, target, offset=0, size=0):
388
        #req = self.get_req(X_MAPR, dst, target, size=size, offset=offset,
389
                #datalen=0)
390
        req = Request.get_mapr_request(self.xseg, dst, target, offset=offset,
391
                size=size)
392
        req.submit()
393
        return req
394

    
395
    send_and_evaluate_map_read = evaluate(send_map_read)
396

    
397
    def send_map_write(self, dst, target, offset=0, size=0):
398
        #req = self.get_req(X_MAPW, dst, target, size=size, offset=offset,
399
                #datalen=0)
400
        req = Request.get_mapw_request(self.xseg, dst, target, offset=offset,
401
                size=size)
402
        req.submit()
403
        return req
404

    
405
    send_and_evaluate_map_write = evaluate(send_map_write)
406

    
407
    def send_hash(self, dst, target, size=0):
408
        #req = self.get_req(X_hash, dst, target, data=None, size=0)
409
        req = Request.get_hash_request(self.xseg, dst, target, size=size)
410
        req.submit()
411
        return req
412

    
413
    send_and_evaluate_hash = evaluate(send_hash)
414

    
415
    def get_filed(self, args, clean=False):
416
        path = args['archip_dir']
417
        if not os.path.exists(path):
418
            os.makedirs(path)
419

    
420
        if clean:
421
            recursive_remove(path)
422

    
423
        return Filed(**args)
424

    
425
    def get_sosd(self, args, clean=False):
426
        pool = args['pool']
427
        import rados
428
        cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
429
        cluster.connect()
430
        if cluster.pool_exists(pool):
431
            cluster.delete_pool(pool)
432
        cluster.create_pool(pool)
433

    
434
        cluster.shutdown()
435
        return Sosd(**args)
436

    
437
    def get_mapperd(self, args):
438
        return Mapperd(**args)
439

    
440
    def get_vlmcd(self, args):
441
        return Vlmcd(**args)
442

    
443
class VlmcdTest(XsegTest):
444
    bfiled_args = {
445
            'role': 'vlmctest-blockerb',
446
            'spec': XsegTest.spec,
447
            'nr_ops': 16,
448
            'archip_dir': '/tmp/bfiledtest/',
449
            'prefix': 'archip_',
450
            'portno_start': 0,
451
            'portno_end': 0,
452
            'daemon': True,
453
            'log_level': 3,
454
            }
455
    mfiled_args = {
456
            'role': 'vlmctest-blockerm',
457
            'spec': XsegTest.spec,
458
            'nr_ops': 16,
459
            'archip_dir': '/tmp/mfiledtest/',
460
            'prefix': 'archip_',
461
            'portno_start': 1,
462
            'portno_end': 1,
463
            'daemon': True,
464
            'log_level': 3,
465
            }
466
    mapperd_args = {
467
            'role': 'vlmctest-mapper',
468
            'spec': XsegTest.spec,
469
            'nr_ops': 16,
470
            'portno_start': 2,
471
            'portno_end': 2,
472
            'daemon': True,
473
            'log_level': 3,
474
            'blockerb_port': 0,
475
            'blockerm_port': 1,
476
            }
477
    vlmcd_args = {
478
            'role': 'vlmctest-vlmc',
479
            'spec': XsegTest.spec,
480
            'nr_ops': 16,
481
            'portno_start': 3,
482
            'portno_end': 3,
483
            'daemon': True,
484
            'log_level': 3,
485
            'blocker_port': 0,
486
            'mapper_port': 2
487
            }
488

    
489
    def setUp(self):
490
        super(VlmcdTest, self).setUp()
491
        try:
492
            self.blockerm = self.get_filed(self.mfiled_args, clean=True)
493
            self.blockerb = self.get_filed(self.bfiled_args, clean=True)
494
            self.mapperd = self.get_mapperd(self.mapperd_args)
495
            self.vlmcd = self.get_vlmcd(self.vlmcd_args)
496
            self.vlmcdport = self.vlmcd.portno_start
497
            self.mapperdport = self.mapperd.portno_start
498
            self.blockerbport = self.blockerb.portno_start
499
            start_peer(self.blockerm)
500
            start_peer(self.blockerb)
501
            start_peer(self.mapperd)
502
            start_peer(self.vlmcd)
503
        except Exception as e:
504
            print e
505
            stop_peer(self.vlmcd)
506
            stop_peer(self.mapperd)
507
            stop_peer(self.blockerb)
508
            stop_peer(self.blockerm)
509
            super(VlmcdTest, self).tearDown()
510
            raise e
511

    
512
    def tearDown(self):
513
        stop_peer(self.vlmcd)
514
        stop_peer(self.mapperd)
515
        stop_peer(self.blockerb)
516
        stop_peer(self.blockerm)
517
        super(VlmcdTest, self).tearDown()
518

    
519
    def test_open(self):
520
        volume = "myvolume"
521
        volsize = 10*1024*1024
522
        self.send_and_evaluate_open(self.vlmcdport, volume, expected=False)
523
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
524
                clone_size=volsize)
525
        self.send_and_evaluate_open(self.vlmcdport, volume)
526
        self.send_and_evaluate_open(self.vlmcdport, volume)
527

    
528
    def test_close(self):
529
        volume = "myvolume"
530
        volsize = 10*1024*1024
531
        self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
532
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
533
                clone_size=volsize)
534
        self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
535
        self.send_and_evaluate_open(self.vlmcdport, volume)
536
        self.send_and_evaluate_close(self.vlmcdport, volume)
537
        self.send_and_evaluate_close(self.vlmcdport, volume, expected=False)
538

    
539
    def test_info(self):
540
        volume = "myvolume"
541
        volsize = 10*1024*1024
542
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
543
                clone_size=volsize)
544
        xinfo = self.get_reply_info(volsize)
545
        self.send_and_evaluate_info(self.vlmcdport, volume, expected_data=xinfo)
546

    
547
    def test_write_read(self):
548
        datalen = 1024
549
        data = get_random_string(datalen, 16)
550
        volume = "myvolume"
551
        volsize = 10*1024*1024
552

    
553
        self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
554
                expected=False)
555
        self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
556
                expected=False)
557
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
558
                clone_size=volsize)
559
        self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
560
                serviced=datalen)
561
        self.send_and_evaluate_read(self.vlmcdport, volume, size=datalen,
562
                expected_data=data)
563

    
564
    def test_clone_snapshot(self):
565
        volume = "myvolume"
566
        snap = "mysnapshot"
567
        snap2 = "mysnapshot2"
568
        snap2 = "mysnapshot3"
569
        clone1 = "myclone1"
570
        clone2 = "myclone2"
571

    
572
        volsize = 100*1024*1024*1024
573
        clone2size = 200*1024*1024*1024
574
        offset = 90*1024*1024*1024
575
        size = 10*1024*1024
576

    
577
        zeros = '\x00' * size
578
        data = get_random_string(size, 16)
579
        data2 = get_random_string(size, 16)
580

    
581
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
582
                clone_size=volsize)
583
        self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
584
                offset=offset, expected_data=zeros)
585

    
586
        self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
587
        self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
588
                offset=offset, expected_data=zeros)
589
        self.send_and_evaluate_write(self.vlmcdport, volume, data=data, offset=offset,
590
                serviced=size)
591
        self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
592
                offset=offset, expected_data=zeros)
593
        self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
594
                offset=offset, expected_data=data)
595

    
596
        self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap2)
597
        self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
598
                offset=offset, expected_data=data)
599
        self.send_and_evaluate_clone(self.mapperdport, snap2, clone=clone1,
600
                clone_size=clone2size)
601
        self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
602
                offset=offset, expected_data=data)
603
        self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
604
                offset=volsize+offset, expected_data=zeros)
605

    
606
        self.send_and_evaluate_write(self.vlmcdport, clone1, data=data2,
607
                                offset=offset, serviced=size)
608
        self.send_and_evaluate_read(self.vlmcdport, clone1, size=size,
609
                offset=offset, expected_data=data2)
610
        self.send_and_evaluate_read(self.vlmcdport, snap2, size=size,
611
                offset=offset, expected_data=data)
612
        self.send_and_evaluate_read(self.vlmcdport, volume, size=size,
613
                offset=offset, expected_data=data)
614
        self.send_and_evaluate_read(self.vlmcdport, snap, size=size,
615
                offset=offset, expected_data=zeros)
616

    
617
    def test_info2(self):
618
        volume = "myvolume"
619
        volsize = 10*1024*1024
620
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
621
                clone_size=volsize)
622
        xinfo = self.get_reply_info(volsize)
623
        reqs = Set([])
624
        reqs.add(self.send_info(self.vlmcdport, volume))
625
        reqs.add(self.send_info(self.vlmcdport, volume))
626
        while len(reqs) > 0:
627
            req = self.xseg.wait_requests(reqs)
628
            self.evaluate_req(req, data=xinfo)
629
            reqs.remove(req)
630
            self.assertTrue(req.put())
631

    
632
    def test_flush(self):
633
        datalen = 1024
634
        data = get_random_string(datalen, 16)
635
        volume = "myvolume"
636
        volsize = 10*1024*1024
637

    
638
        #This may seems weird, but actually vlmcd flush, only guarantees that
639
        #there are no pending operation the volume. On a volume that does not
640
        #exists, this is always true, so this should succeed.
641
        self.send_and_evaluate_write(self.vlmcdport, volume, data="",
642
                flags=XF_FLUSH, expected=True)
643
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
644
                clone_size=volsize)
645
        self.send_and_evaluate_write(self.vlmcdport, volume, data="",
646
                flags=XF_FLUSH)
647
        self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
648
                serviced=datalen)
649
        self.send_and_evaluate_write(self.vlmcdport, volume, data="",
650
                flags=XF_FLUSH)
651

    
652
    def test_flush2(self):
653
        volume = "myvolume"
654
        volsize = 10*1024*1024
655
        datalen = 1024
656
        data = get_random_string(datalen, 16)
657

    
658
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
659
                clone_size=volsize)
660
        xinfo = self.get_reply_info(volsize)
661
        reqs = Set([])
662
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
663
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
664
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
665
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
666
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
667
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
668
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
669
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
670
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
671
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
672
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
673
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
674
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
675
        reqs.add(self.send_write(self.vlmcdport, volume, data="", flags=XF_FLUSH))
676
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
677
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
678
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
679
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
680
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
681
        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
682
        while len(reqs) > 0:
683
            req = self.xseg.wait_requests(reqs)
684
            self.evaluate_req(req)
685
            reqs.remove(req)
686
            self.assertTrue(req.put())
687

    
688
    def test_hash(self):
689
        blocksize = self.blocksize
690
        volume = "myvolume"
691
        volume2 = "myvolume2"
692
        snap = "snapshot"
693
        clone = "clone"
694
        volsize = 10*1024*1024
695
        size = 512*1024
696
        epoch = 1
697
        offset = 0
698
        data = get_random_string(size, 16)
699

    
700
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
701
                clone_size=volsize)
702
        self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
703
                                offset=offset, serviced=size)
704

    
705
        self.send_and_evaluate_snapshot(self.vlmcdport, volume, snap=snap)
706

    
707
        self.send_and_evaluate_hash(self.mapperdport, volume, size=volsize,
708
                expected=False)
709
        req = self.send_hash(self.mapperdport, snap, size=volsize)
710
        req.wait()
711
        self.assertTrue(req.success())
712
        xreply = req.get_data(xseg_reply_hash).contents
713
        hash_map = ctypes.string_at(xreply.target, xreply.targetlen)
714
        req.put()
715

    
716
        req = self.send_map_read(self.mapperdport, snap, offset=0,
717
                size=volsize)
718
        req.wait()
719
        self.assertTrue(req.success())
720
        xreply = req.get_data(xseg_reply_map).contents
721
        blocks = self.get_list_of_hashes(xreply, from_segment=True)
722
        req.put()
723
        h = []
724
        for b in blocks:
725
            if (b == sha256('').hexdigest()):
726
                h.append(unhexlify(b))
727
                continue
728
            req = self.send_hash(self.blockerbport, b, size=blocksize)
729
            req.wait()
730
            self.assertTrue(req.success())
731
            xreply = req.get_data(xseg_reply_hash).contents
732
            h.append(unhexlify(ctypes.string_at(xreply.target, xreply.targetlen)))
733
            req.put()
734

    
735
        mh = hexlify(merkle_hash(h))
736
        self.assertEqual(hash_map, mh)
737

    
738
        self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
739
                clone_size=volsize * 2, expected=False)
740
        self.send_and_evaluate_clone(self.mapperdport, hash_map, clone=volume2,
741
                clone_size=volsize * 2, cont_addr=True)
742
        self.send_and_evaluate_read(self.vlmcdport, volume2, size=size,
743
                offset=offset, expected_data=data)
744
        self.send_and_evaluate_read(self.vlmcdport, volume2, size=volsize - size,
745
                offset=offset + size, expected_data='\x00' * (volsize - size))
746

    
747

    
748
class MapperdTest(XsegTest):
749
    bfiled_args = {
750
            'role': 'mappertest-blockerb',
751
            'spec': XsegTest.spec,
752
            'nr_ops': 16,
753
            'archip_dir': '/tmp/bfiledtest/',
754
            'prefix': 'archip_',
755
            'portno_start': 0,
756
            'portno_end': 0,
757
            'daemon': True,
758
            'log_level': 3,
759
            }
760
    mfiled_args = {
761
            'role': 'mappertest-blockerm',
762
            'spec': XsegTest.spec,
763
            'nr_ops': 16,
764
            'archip_dir': '/tmp/mfiledtest/',
765
            'prefix': 'archip_',
766
            'portno_start': 1,
767
            'portno_end': 1,
768
            'daemon': True,
769
            'log_level': 3,
770
            }
771
    mapperd_args = {
772
            'role': 'mappertest-mapper',
773
            'spec': XsegTest.spec,
774
            'nr_ops': 16,
775
            'portno_start': 2,
776
            'portno_end': 2,
777
            'daemon': True,
778
            'log_level': 3,
779
            'blockerb_port': 0,
780
            'blockerm_port': 1,
781
            }
782
    blocksize = 4*1024*1024
783

    
784
    def setUp(self):
785
        super(MapperdTest, self).setUp()
786
        try:
787
            self.blockerm = self.get_filed(self.mfiled_args, clean=True)
788
            self.blockerb = self.get_filed(self.bfiled_args, clean=True)
789
            self.mapperd = self.get_mapperd(self.mapperd_args)
790
            self.mapperdport = self.mapperd.portno_start
791
            start_peer(self.blockerm)
792
            start_peer(self.blockerb)
793
            start_peer(self.mapperd)
794
        except Exception as e:
795
            print e
796
            stop_peer(self.mapperd)
797
            stop_peer(self.blockerb)
798
            stop_peer(self.blockerm)
799
            super(MapperdTest, self).tearDown()
800
            raise e
801

    
802
    def tearDown(self):
803
        stop_peer(self.mapperd)
804
        stop_peer(self.blockerb)
805
        stop_peer(self.blockerm)
806
        super(MapperdTest, self).tearDown()
807

    
808
    def test_create(self):
809
        volume = "myvolume"
810
        volsize = 10*1024*1024
811
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
812
                clone_size=0, expected=False)
813
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
814
                clone_size=volsize)
815
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
816
                clone_size=volsize, expected=False)
817

    
818
    def test_delete(self):
819
        volume = "myvolume"
820
        volsize = 10*1024*1024
821
        offset = 0
822
        size = 10
823
        epoch = 2
824

    
825
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
826
                clone_size=0, expected=False)
827
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
828
                clone_size=volsize)
829
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
830
                clone_size=volsize, expected=False)
831
        self.send_and_evaluate_delete(self.mapperdport, volume)
832
        self.send_and_evaluate_delete(self.mapperdport, volume, expected=False)
833
        self.send_and_evaluate_map_write(self.mapperdport, volume,
834
                offset=offset, size=size, expected=False)
835
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
836
                clone_size=volsize)
837

    
838
        ret = self.get_copy_map_reply(volume, offset, size, epoch)
839

    
840
        self.send_and_evaluate_map_write(self.mapperdport, volume,
841
                expected_data=ret, offset=offset, size=size)
842

    
843
    def test_clone_snapshot(self):
844
        volume = "myvolume"
845
        snap = "mysnapshot"
846
        snap2 = "mysnapshot2"
847
        snap2 = "mysnapshot3"
848
        clone1 = "myclone1"
849
        clone2 = "myclone2"
850
        volsize = 100*1024*1024*1024
851
        clone2size = 200*1024*1024*1024
852
        offset = 90*1024*1024*1024
853
        size = 10*1024*1024
854
        offset = 0
855
        size = volsize
856
        epoch = 2
857

    
858
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
859
                clone_size=volsize)
860
        self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap)
861
        self.send_and_evaluate_snapshot(self.mapperdport, volume, snap=snap,
862
                expected=False)
863
        xinfo = self.get_reply_info(volsize)
864
        self.send_and_evaluate_info(self.mapperdport, snap, expected_data=xinfo)
865
        ret = self.get_zero_map_reply(offset, size)
866
        self.send_and_evaluate_map_read(self.mapperdport, volume,
867
                expected_data=ret, offset=offset, size=size)
868
        self.send_and_evaluate_map_read(self.mapperdport, snap,
869
                expected_data=ret, offset=offset, size=size)
870

    
871
        ret = self.get_copy_map_reply(volume, offset, size, epoch)
872
        self.send_and_evaluate_map_write(self.mapperdport, volume,
873
                expected_data=ret, offset=offset, size=size)
874

    
875
        stop_peer(self.mapperd)
876
        start_peer(self.mapperd)
877
        self.send_and_evaluate_map_read(self.mapperdport, volume,
878
                expected_data=ret, offset=offset, size=size)
879

    
880
        self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone1)
881
        xinfo = self.get_reply_info(volsize)
882
        self.send_and_evaluate_info(self.mapperdport, clone1, expected_data=xinfo)
883
        self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
884
                clone_size=2, expected=False)
885
        self.send_and_evaluate_clone(self.mapperdport, snap, clone=clone2,
886
                clone_size=clone2size)
887
        xinfo = self.get_reply_info(clone2size)
888
        self.send_and_evaluate_info(self.mapperdport, clone2, expected_data=xinfo)
889

    
890
    def test_info(self):
891
        volume = "myvolume"
892
        volsize = 10*1024*1024
893
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
894
                clone_size=volsize)
895
        xinfo = self.get_reply_info(volsize)
896
        self.send_and_evaluate_info(self.mapperdport, volume, expected_data=xinfo)
897

    
898
    def test_info2(self):
899
        volume = "myvolume"
900
        volsize = 10*1024*1024
901
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
902
                clone_size=volsize)
903
        xinfo = self.get_reply_info(volsize)
904
        reqs = Set([])
905
        reqs.add(self.send_info(self.mapperdport, volume))
906
        reqs.add(self.send_info(self.mapperdport, volume))
907
        while len(reqs) > 0:
908
            req = self.xseg.wait_requests(reqs)
909
            self.evaluate_req(req, data=xinfo)
910
            reqs.remove(req)
911
            self.assertTrue(req.put())
912

    
913
    def test_open(self):
914
        volume = "myvolume"
915
        volsize = 10*1024*1024
916
        self.send_and_evaluate_open(self.mapperdport, volume, expected=False)
917
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
918
                clone_size=volsize)
919
        self.send_and_evaluate_open(self.mapperdport, volume)
920
        self.send_and_evaluate_open(self.mapperdport, volume)
921

    
922
    def test_open2(self):
923
        volume = "myvolume"
924
        volsize = 10*1024*1024
925
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
926
                clone_size=volsize)
927
        reqs = Set([])
928
        reqs.add(self.send_open(self.mapperdport, volume))
929
        reqs.add(self.send_open(self.mapperdport, volume))
930
        while len(reqs) > 0:
931
            req = self.xseg.wait_requests(reqs)
932
            self.evaluate_req(req)
933
            reqs.remove(req)
934
            self.assertTrue(req.put())
935

    
936
    def test_close(self):
937
        volume = "myvolume"
938
        volsize = 10*1024*1024
939
        self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
940
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
941
                clone_size=volsize)
942
        self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
943
        self.send_and_evaluate_open(self.mapperdport, volume)
944
        self.send_and_evaluate_close(self.mapperdport, volume)
945
        self.send_and_evaluate_close(self.mapperdport, volume, expected=False)
946

    
947
    def test_mapr(self):
948
        volume = "myvolume"
949
        volsize = 10*1024*1024
950
        offset = 0
951
        size = volsize
952
        ret = MapperdTest.get_zero_map_reply(offset, size)
953
        self.send_and_evaluate_map_read(self.mapperdport, volume,
954
                offset=offset, size=size, expected=False)
955
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
956
                clone_size=volsize)
957
        self.send_and_evaluate_map_read(self.mapperdport, volume,
958
                expected_data=ret, offset=offset, size=size)
959
        offset = volsize - 1
960
        self.send_and_evaluate_map_read(self.mapperdport, volume,
961
                offset=offset, size=size, expected=False)
962
        offset = volsize + 1
963
        self.send_and_evaluate_map_read(self.mapperdport, volume,
964
                offset=offset, size=size, expected=False)
965

    
966
    def test_mapr2(self):
967
        volume = "myvolume"
968
        volsize = 10*1024*1024
969
        offset = 0
970
        size = volsize
971

    
972
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
973
                clone_size=volsize)
974
        reqs = Set([])
975
        reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
976
            size=size))
977
        reqs.add(self.send_map_read(self.mapperdport, volume, offset=offset,
978
            size=size))
979
        ret = MapperdTest.get_zero_map_reply(offset, size)
980
        while len(reqs) > 0:
981
            req = self.xseg.wait_requests(reqs)
982
            self.evaluate_req(req, data=ret)
983
            reqs.remove(req)
984
            self.assertTrue(req.put())
985

    
986
    def test_mapw(self):
987
        blocksize = self.blocksize
988
        volume = "myvolume"
989
        volsize = 100*1024*1024*1024
990
        offset = 90*1024*1024*1024 - 2
991
        size = 512*1024
992
        epoch = 1
993

    
994
        ret = self.get_copy_map_reply(volume, offset, size, epoch)
995

    
996
        self.send_and_evaluate_map_write(self.mapperdport, volume,
997
                offset=offset, size=size, expected=False)
998
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
999
                clone_size=volsize)
1000
        self.send_and_evaluate_map_write(self.mapperdport, volume,
1001
                expected_data=ret, offset=offset, size=size)
1002
        self.send_and_evaluate_map_read(self.mapperdport, volume,
1003
                expected_data = ret, offset=offset, size=size)
1004
        stop_peer(self.mapperd)
1005
        start_peer(self.mapperd)
1006
        self.send_and_evaluate_map_read(self.mapperdport, volume,
1007
                expected_data=ret, offset=offset, size=size)
1008
        self.send_and_evaluate_open(self.mapperdport, volume)
1009
        offset = 101*1024*1024*1024
1010
        self.send_and_evaluate_map_write(self.mapperdport, volume,
1011
                offset=offset, size=size, expected=False)
1012
        offset = 100*1024*1024*1024 - 1
1013
        self.send_and_evaluate_map_write(self.mapperdport, volume,
1014
                offset=offset, size=size, expected=False)
1015

    
1016
    def test_mapw2(self):
1017
        blocksize = self.blocksize
1018
        volume = "myvolume"
1019
        volsize = 100*1024*1024*1024
1020
        offset = 90*1024*1024*1024 - 2
1021
        size = 512*1024
1022
        epoch = 1
1023

    
1024
        ret = self.get_copy_map_reply(volume, offset, size, epoch)
1025

    
1026
        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
1027
                clone_size=volsize)
1028

    
1029
        reqs = Set([])
1030
        reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1031
            size=size))
1032
        reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1033
            size=size))
1034
        reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1035
            size=size))
1036
        reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1037
            size=size))
1038
        reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1039
            size=size))
1040
        reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1041
            size=size))
1042
        reqs.add(self.send_map_write(self.mapperdport, volume, offset=offset,
1043
            size=size))
1044
        while len(reqs) > 0:
1045
            req = self.xseg.wait_requests(reqs)
1046
            self.evaluate_req(req, data=ret)
1047
            reqs.remove(req)
1048
            self.assertTrue(req.put())
1049

    
1050
class BlockerTest(object):
1051
    def test_write_read(self):
1052
        datalen = 1024
1053
        data = get_random_string(datalen, 16)
1054
        target = "mytarget"
1055
        xinfo = self.get_reply_info(datalen)
1056

    
1057
        self.send_and_evaluate_write(self.blockerport, target, data=data,
1058
                serviced=datalen)
1059
        self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1060
                expected_data=data)
1061
        self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1062
        stop_peer(self.blocker)
1063
        start_peer(self.blocker)
1064
        self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1065
                expected_data=data)
1066
        self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1067

    
1068
    def test_info(self):
1069
        datalen = 1024
1070
        data = get_random_string(datalen, 16)
1071
        target = "mytarget"
1072
        self.send_and_evaluate_write(self.blockerport, target, data=data,
1073
                serviced=datalen)
1074
        xinfo = self.get_reply_info(datalen)
1075
        self.send_and_evaluate_info(self.blockerport, target, expected_data=xinfo)
1076

    
1077
    def test_copy(self):
1078
        datalen = 1024
1079
        data = get_random_string(datalen, 16)
1080
        target = "mytarget"
1081
        copy_target = "copy_target"
1082

    
1083
        self.send_and_evaluate_write(self.blockerport, target, data=data,
1084
                serviced=datalen)
1085
        self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1086
                expected_data=data, serviced=datalen)
1087
        self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
1088
                size=datalen, serviced=datalen)
1089
        self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
1090
                size=datalen+1, serviced=datalen+1)
1091
        self.send_and_evaluate_read(self.blockerport, copy_target, size=datalen,
1092
                expected_data=data)
1093

    
1094

    
1095
    def test_delete(self):
1096
        datalen = 1024
1097
        data = get_random_string(datalen, 16)
1098
        target = "mytarget"
1099
        self.send_and_evaluate_delete(self.blockerport, target, False)
1100
        self.send_and_evaluate_write(self.blockerport, target, data=data)
1101
        self.send_and_evaluate_read(self.blockerport, target, size=datalen,
1102
                expected_data=data)
1103
        self.send_and_evaluate_delete(self.blockerport, target, True)
1104
        data = '\x00' * datalen
1105
        self.send_and_evaluate_read(self.blockerport, target, size=datalen, expected_data=data)
1106

    
1107
    def test_hash(self):
1108
        datalen = 1024
1109
        data = '\x00'*datalen
1110
        target = "target_zeros"
1111

    
1112

    
1113
        self.send_and_evaluate_write(self.blockerport, target, data=data,
1114
                serviced=datalen)
1115
        ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1116
        self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1117
                expected_data=ret, serviced=datalen)
1118

    
1119
        target = "mytarget"
1120
        data = get_random_string(datalen, 16)
1121
        self.send_and_evaluate_write(self.blockerport, target, data=data,
1122
                serviced=datalen)
1123
        ret = self.get_hash_reply(sha256(data.rstrip('\x00')).hexdigest())
1124
        self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1125
                expected_data=ret, serviced=datalen)
1126
        self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1127
                expected_data=ret, serviced=datalen)
1128
        self.send_and_evaluate_hash(self.blockerport, target, size=datalen,
1129
                expected_data=ret, serviced=datalen)
1130

    
1131
    def test_locking(self):
1132
        target = "mytarget"
1133
        self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1134
        self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1135
        self.send_and_evaluate_release(self.blockerport, target, expected=True)
1136
        self.send_and_evaluate_release(self.blockerport, target, expected=False)
1137
        self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1138
        stop_peer(self.blocker)
1139
        start_peer(self.blocker)
1140
        self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1141
        self.send_and_evaluate_release(self.blockerport, target, expected=False)
1142
        self.send_and_evaluate_release(self.blockerport, target, force=True,
1143
                expected=True)
1144

    
1145

    
1146
class FiledTest(BlockerTest, XsegTest):
1147
    filed_args = {
1148
            'role': 'testfiled',
1149
            'spec': XsegTest.spec,
1150
            'nr_ops': 16,
1151
            'archip_dir': '/tmp/filedtest/',
1152
            'prefix': 'archip_',
1153
            'portno_start': 0,
1154
            'portno_end': 0,
1155
            'daemon': True,
1156
            'log_level': 3,
1157
            }
1158

    
1159
    def setUp(self):
1160
        super(FiledTest, self).setUp()
1161
        try:
1162
            self.blocker = self.get_filed(self.filed_args, clean=True)
1163
            self.blockerport = self.blocker.portno_start
1164
            start_peer(self.blocker)
1165
        except Exception as e:
1166
            super(FiledTest, self).tearDown()
1167
            raise e
1168

    
1169
    def tearDown(self):
1170
        stop_peer(self.blocker)
1171
        super(FiledTest, self).tearDown()
1172

    
1173
    def test_locking(self):
1174
        target = "mytarget"
1175
        self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1176
        self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1177
        self.send_and_evaluate_release(self.blockerport, target, expected=True)
1178
        self.send_and_evaluate_release(self.blockerport, target, expected=False)
1179
        self.send_and_evaluate_acquire(self.blockerport, target, expected=True)
1180
        stop_peer(self.blocker)
1181
        new_filed_args = copy(self.filed_args)
1182
        new_filed_args['unique_str'] = 'ThisisSparta'
1183
        self.blocker = Filed(**new_filed_args)
1184
        start_peer(self.blocker)
1185
        self.send_and_evaluate_acquire(self.blockerport, target, expected=False)
1186
        self.send_and_evaluate_release(self.blockerport, target, expected=False)
1187
        self.send_and_evaluate_release(self.blockerport, target, force=True,
1188
                expected=True)
1189

    
1190
class SosdTest(BlockerTest, XsegTest):
1191
    filed_args = {
1192
            'role': 'testsosd',
1193
            'spec': XsegTest.spec,
1194
            'nr_ops': 16,
1195
            'portno_start': 0,
1196
            'portno_end': 0,
1197
            'daemon': True,
1198
            'log_level': 3,
1199
            'pool': 'test_sosd',
1200
            'nr_threads': 3,
1201
            }
1202

    
1203
    def setUp(self):
1204
        super(SosdTest, self).setUp()
1205
        try:
1206
            self.blocker = self.get_sosd(self.filed_args, clean=True)
1207
            self.blockerport = self.blocker.portno_start
1208
            start_peer(self.blocker)
1209
        except Exception as e:
1210
            super(SosdTest, self).tearDown()
1211
            raise e
1212

    
1213
    def tearDown(self):
1214
        stop_peer(self.blocker)
1215
        super(SosdTest, self).tearDown()
1216

    
1217
if __name__=='__main__':
1218
    init()
1219
    unittest.main()