# or implied, of GRNET S.A.
import archipelago
-from archipelago.common import Xseg_ctx, Request, Filed, Mapperd, Vlmcd, Sosd, \
- create_segment, destroy_segment, Error
+from archipelago.common import Xseg_ctx, Request, Filed, Mapperd, Vlmcd, Radosd, \
+ Error, Segment
from archipelago.archipelago import start_peer, stop_peer
import random as rnd
import unittest2 as unittest
def init():
rnd.seed()
- archipelago.common.BIN_DIR=os.path.join(os.getcwd(), '../../peers/user/')
+# archipelago.common.BIN_DIR=os.path.join(os.getcwd(), '../../peers/user/')
archipelago.common.LOGS_PATH=os.path.join(os.getcwd(), 'logs')
archipelago.common.PIDFILE_PATH=os.path.join(os.getcwd(), 'pids')
if not os.path.isdir(archipelago.common.LOGS_PATH):
recursive_remove(archipelago.common.LOGS_PATH)
class XsegTest(unittest.TestCase):
- xseg = None
- myport = 15
- spec = "posix:testsegment:16:256:12".encode()
+ spec = "posix:testsegment:8:16:256:12".encode()
blocksize = 4*1024*1024
+ segment = None
def setUp(self):
+ self.segment = Segment('posix', 'testsegment', 8, 16, 256, 12)
try:
- create_segment(self.spec)
+ self.segment.create()
except Exception as e:
- destroy_segment(self.spec)
- create_segment(self.spec)
- self.xseg = Xseg_ctx(self.spec, self.myport)
+ self.segment.destroy()
+ self.segment.create()
+ self.xseg = Xseg_ctx(self.segment)
def tearDown(self):
if self.xseg:
self.xseg.shutdown()
- destroy_segment(self.spec)
+ if self.segment:
+ self.segment.destroy()
@staticmethod
def get_reply_info(size):
self.assertTrue(req.put())
return send_and_evaluate
- def send_write(self, dst, target, data=None, offset=0, datalen=0):
+ def send_write(self, dst, target, data=None, offset=0, datalen=0, flags=0):
#assert datalen >= size
# req = self.get_req(X_WRITE, dst, target, data, size=size, offset=offset, datalen=datalen)
req = Request.get_write_request(self.xseg, dst, target, data=data,
- offset=offset, datalen=datalen)
+ offset=offset, datalen=datalen, flags=flags)
req.submit()
return req
return Filed(**args)
- def get_sosd(self, args, clean=False):
+ def get_radosd(self, args, clean=False):
pool = args['pool']
import rados
cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
cluster.create_pool(pool)
cluster.shutdown()
- return Sosd(**args)
+ return Radosd(**args)
def get_mapperd(self, args):
return Mapperd(**args)
req = self.xseg.wait_requests(reqs)
self.evaluate_req(req, data=xinfo)
reqs.remove(req)
+ self.assertTrue(req.put())
+
+ def test_flush(self):
+ datalen = 1024
+ data = get_random_string(datalen, 16)
+ volume = "myvolume"
+ volsize = 10*1024*1024
+
+ #This may seems weird, but actually vlmcd flush, only guarantees that
+ #there are no pending operation the volume. On a volume that does not
+ #exists, this is always true, so this should succeed.
+ self.send_and_evaluate_write(self.vlmcdport, volume, data="",
+ flags=XF_FLUSH, expected=True)
+ self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
+ clone_size=volsize)
+ self.send_and_evaluate_write(self.vlmcdport, volume, data="",
+ flags=XF_FLUSH)
+ self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
+ serviced=datalen)
+ self.send_and_evaluate_write(self.vlmcdport, volume, data="",
+ flags=XF_FLUSH)
+
+ def test_flush2(self):
+ volume = "myvolume"
+ volsize = 10*1024*1024
+ datalen = 1024
+ data = get_random_string(datalen, 16)
+
+ self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
+ clone_size=volsize)
+ xinfo = self.get_reply_info(volsize)
+ reqs = Set([])
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data="", flags=XF_FLUSH))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+ while len(reqs) > 0:
+ req = self.xseg.wait_requests(reqs)
+ self.evaluate_req(req)
+ reqs.remove(req)
+ self.assertTrue(req.put())
def test_hash(self):
blocksize = self.blocksize
req = self.xseg.wait_requests(reqs)
self.evaluate_req(req, data=xinfo)
reqs.remove(req)
+ self.assertTrue(req.put())
def test_open(self):
volume = "myvolume"
req = self.xseg.wait_requests(reqs)
self.evaluate_req(req)
reqs.remove(req)
+ self.assertTrue(req.put())
def test_close(self):
volume = "myvolume"
req = self.xseg.wait_requests(reqs)
self.evaluate_req(req, data=ret)
reqs.remove(req)
+ self.assertTrue(req.put())
def test_mapw(self):
blocksize = self.blocksize
req = self.xseg.wait_requests(reqs)
self.evaluate_req(req, data=ret)
reqs.remove(req)
+ self.assertTrue(req.put())
class BlockerTest(object):
def test_write_read(self):
expected_data=data, serviced=datalen)
self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
size=datalen, serviced=datalen)
+ self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
+ size=datalen+1, serviced=datalen+1)
self.send_and_evaluate_read(self.blockerport, copy_target, size=datalen,
expected_data=data)
self.send_and_evaluate_release(self.blockerport, target, force=True,
expected=True)
-class SosdTest(BlockerTest, XsegTest):
+class RadosdTest(BlockerTest, XsegTest):
filed_args = {
- 'role': 'testsosd',
+ 'role': 'testradosd',
'spec': XsegTest.spec,
'nr_ops': 16,
'portno_start': 0,
'portno_end': 0,
'daemon': True,
'log_level': 3,
- 'pool': 'test_sosd',
+ 'pool': 'test_radosd',
'nr_threads': 3,
}
def setUp(self):
- super(SosdTest, self).setUp()
+ super(RadosdTest, self).setUp()
try:
- self.blocker = self.get_sosd(self.filed_args, clean=True)
+ self.blocker = self.get_radosd(self.filed_args, clean=True)
self.blockerport = self.blocker.portno_start
start_peer(self.blocker)
except Exception as e:
- super(SosdTest, self).tearDown()
+ super(RadosdTest, self).tearDown()
raise e
def tearDown(self):
stop_peer(self.blocker)
- super(SosdTest, self).tearDown()
+ super(RadosdTest, self).tearDown()
if __name__=='__main__':
init()