Merge branch 'develop' into debian-develop
[archipelago] / tests / tests.py
similarity index 91%
rename from xseg/tools/qa/tests.py
rename to tests/tests.py
index 47f7ca6..cd70484 100644 (file)
@@ -32,8 +32,8 @@
 # or implied, of GRNET S.A.
 
 import archipelago
-from archipelago.common import Xseg_ctx, Request, Filed, Mapperd, Vlmcd, Sosd, \
-        create_segment, destroy_segment, Error
+from archipelago.common import Xseg_ctx, Request, Filed, Mapperd, Vlmcd, Radosd, \
+        Error, Segment
 from archipelago.archipelago import start_peer, stop_peer
 import random as rnd
 import unittest2 as unittest
@@ -86,7 +86,7 @@ def merkle_hash(hashes):
 
 def init():
     rnd.seed()
-    archipelago.common.BIN_DIR=os.path.join(os.getcwd(), '../../peers/user/')
+#    archipelago.common.BIN_DIR=os.path.join(os.getcwd(), '../../peers/user/')
     archipelago.common.LOGS_PATH=os.path.join(os.getcwd(), 'logs')
     archipelago.common.PIDFILE_PATH=os.path.join(os.getcwd(), 'pids')
     if not os.path.isdir(archipelago.common.LOGS_PATH):
@@ -97,23 +97,24 @@ def init():
     recursive_remove(archipelago.common.LOGS_PATH)
 
 class XsegTest(unittest.TestCase):
-    xseg = None
-    myport = 15
-    spec = "posix:testsegment:16:256:12".encode()
+    spec = "posix:testsegment:8:16:256:12".encode()
     blocksize = 4*1024*1024
+    segment = None
 
     def setUp(self):
+        self.segment = Segment('posix', 'testsegment', 8, 16, 256, 12)
         try:
-            create_segment(self.spec)
+            self.segment.create()
         except Exception as e:
-            destroy_segment(self.spec)
-            create_segment(self.spec)
-        self.xseg = Xseg_ctx(self.spec, self.myport)
+            self.segment.destroy()
+            self.segment.create()
+        self.xseg = Xseg_ctx(self.segment)
 
     def tearDown(self):
         if self.xseg:
             self.xseg.shutdown()
-        destroy_segment(self.spec)
+        if self.segment:
+            self.segment.destroy()
 
     @staticmethod
     def get_reply_info(size):
@@ -269,11 +270,11 @@ class XsegTest(unittest.TestCase):
             self.assertTrue(req.put())
         return send_and_evaluate
 
-    def send_write(self, dst, target, data=None, offset=0, datalen=0):
+    def send_write(self, dst, target, data=None, offset=0, datalen=0, flags=0):
         #assert datalen >= size
 #        req = self.get_req(X_WRITE, dst, target, data, size=size, offset=offset, datalen=datalen)
         req = Request.get_write_request(self.xseg, dst, target, data=data,
-                offset=offset, datalen=datalen)
+                offset=offset, datalen=datalen, flags=flags)
         req.submit()
         return req
 
@@ -421,7 +422,7 @@ class XsegTest(unittest.TestCase):
 
         return Filed(**args)
 
-    def get_sosd(self, args, clean=False):
+    def get_radosd(self, args, clean=False):
         pool = args['pool']
         import rados
         cluster = rados.Rados(conffile='/etc/ceph/ceph.conf')
@@ -431,7 +432,7 @@ class XsegTest(unittest.TestCase):
         cluster.create_pool(pool)
 
         cluster.shutdown()
-        return Sosd(**args)
+        return Radosd(**args)
 
     def get_mapperd(self, args):
         return Mapperd(**args)
@@ -626,6 +627,63 @@ class VlmcdTest(XsegTest):
             req = self.xseg.wait_requests(reqs)
             self.evaluate_req(req, data=xinfo)
             reqs.remove(req)
+            self.assertTrue(req.put())
+
+    def test_flush(self):
+        datalen = 1024
+        data = get_random_string(datalen, 16)
+        volume = "myvolume"
+        volsize = 10*1024*1024
+
+        #This may seems weird, but actually vlmcd flush, only guarantees that
+        #there are no pending operation the volume. On a volume that does not
+        #exists, this is always true, so this should succeed.
+        self.send_and_evaluate_write(self.vlmcdport, volume, data="",
+                flags=XF_FLUSH, expected=True)
+        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
+                clone_size=volsize)
+        self.send_and_evaluate_write(self.vlmcdport, volume, data="",
+                flags=XF_FLUSH)
+        self.send_and_evaluate_write(self.vlmcdport, volume, data=data,
+                serviced=datalen)
+        self.send_and_evaluate_write(self.vlmcdport, volume, data="",
+                flags=XF_FLUSH)
+
+    def test_flush2(self):
+        volume = "myvolume"
+        volsize = 10*1024*1024
+        datalen = 1024
+        data = get_random_string(datalen, 16)
+
+        self.send_and_evaluate_clone(self.mapperdport, "", clone=volume,
+                clone_size=volsize)
+        xinfo = self.get_reply_info(volsize)
+        reqs = Set([])
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data="", flags=XF_FLUSH))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        reqs.add(self.send_write(self.vlmcdport, volume, data=data))
+        while len(reqs) > 0:
+            req = self.xseg.wait_requests(reqs)
+            self.evaluate_req(req)
+            reqs.remove(req)
+            self.assertTrue(req.put())
 
     def test_hash(self):
         blocksize = self.blocksize
@@ -850,6 +908,7 @@ class MapperdTest(XsegTest):
             req = self.xseg.wait_requests(reqs)
             self.evaluate_req(req, data=xinfo)
             reqs.remove(req)
+            self.assertTrue(req.put())
 
     def test_open(self):
         volume = "myvolume"
@@ -872,6 +931,7 @@ class MapperdTest(XsegTest):
             req = self.xseg.wait_requests(reqs)
             self.evaluate_req(req)
             reqs.remove(req)
+            self.assertTrue(req.put())
 
     def test_close(self):
         volume = "myvolume"
@@ -921,6 +981,7 @@ class MapperdTest(XsegTest):
             req = self.xseg.wait_requests(reqs)
             self.evaluate_req(req, data=ret)
             reqs.remove(req)
+            self.assertTrue(req.put())
 
     def test_mapw(self):
         blocksize = self.blocksize
@@ -984,6 +1045,7 @@ class MapperdTest(XsegTest):
             req = self.xseg.wait_requests(reqs)
             self.evaluate_req(req, data=ret)
             reqs.remove(req)
+            self.assertTrue(req.put())
 
 class BlockerTest(object):
     def test_write_read(self):
@@ -1024,6 +1086,8 @@ class BlockerTest(object):
                 expected_data=data, serviced=datalen)
         self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
                 size=datalen, serviced=datalen)
+        self.send_and_evaluate_copy(self.blockerport, target, dst_target=copy_target,
+                size=datalen+1, serviced=datalen+1)
         self.send_and_evaluate_read(self.blockerport, copy_target, size=datalen,
                 expected_data=data)
 
@@ -1123,32 +1187,32 @@ class FiledTest(BlockerTest, XsegTest):
         self.send_and_evaluate_release(self.blockerport, target, force=True,
                 expected=True)
 
-class SosdTest(BlockerTest, XsegTest):
+class RadosdTest(BlockerTest, XsegTest):
     filed_args = {
-            'role': 'testsosd',
+            'role': 'testradosd',
             'spec': XsegTest.spec,
             'nr_ops': 16,
             'portno_start': 0,
             'portno_end': 0,
             'daemon': True,
             'log_level': 3,
-            'pool': 'test_sosd',
+            'pool': 'test_radosd',
             'nr_threads': 3,
             }
 
     def setUp(self):
-        super(SosdTest, self).setUp()
+        super(RadosdTest, self).setUp()
         try:
-            self.blocker = self.get_sosd(self.filed_args, clean=True)
+            self.blocker = self.get_radosd(self.filed_args, clean=True)
             self.blockerport = self.blocker.portno_start
             start_peer(self.blocker)
         except Exception as e:
-            super(SosdTest, self).tearDown()
+            super(RadosdTest, self).tearDown()
             raise e
 
     def tearDown(self):
         stop_peer(self.blocker)
-        super(SosdTest, self).tearDown()
+        super(RadosdTest, self).tearDown()
 
 if __name__=='__main__':
     init()