Include version information in aquarium messages
authorSofia Papagiannaki <papagian@gmail.com>
Thu, 13 Sep 2012 12:06:54 +0000 (15:06 +0300)
committerSofia Papagiannaki <papagian@gmail.com>
Fri, 9 Nov 2012 17:34:36 +0000 (19:34 +0200)
snf-pithos-backend/pithos/backends/lib/sqlalchemy/node.py
snf-pithos-backend/pithos/backends/lib/sqlite/node.py
snf-pithos-backend/pithos/backends/modular.py
snf-pithos-tools/pithos/tools/test.py

index cfa96b3..2efcd17 100644 (file)
@@ -330,10 +330,14 @@ class Node(DBWorker):
         self.statistics_update(parent, -nr, size, mtime, cluster)
         self.statistics_update_ancestors(parent, -nr, size, mtime, cluster)
 
-        s = select([self.versions.c.hash])
+        s = select([self.versions.c.hash, self.versions.c.serial])
         s = s.where(where_clause)
         r = self.conn.execute(s)
-        hashes = [row[0] for row in r.fetchall()]
+        hashes = []
+        serials = []
+        for row in r.fetchall():
+            hashes += [row[0]]
+            serials += [row[1]]
         r.close()
 
         #delete versions
@@ -352,7 +356,7 @@ class Node(DBWorker):
         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
         self.conn.execute(s).close()
 
-        return hashes, size
+        return hashes, size, serials
 
     def node_purge(self, node, before=inf, cluster=0):
         """Delete all versions with the specified
@@ -378,10 +382,14 @@ class Node(DBWorker):
         mtime = time()
         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
 
-        s = select([self.versions.c.hash])
+        s = select([self.versions.c.hash, self.versions.c.serial])
         s = s.where(where_clause)
         r = self.conn.execute(s)
-        hashes = [r[0] for r in r.fetchall()]
+        hashes = []
+        serials = []
+        for row in r.fetchall():
+            hashes += [row[0]]
+            serials += [row[1]]
         r.close()
 
         #delete versions
@@ -400,7 +408,7 @@ class Node(DBWorker):
         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
         self.conn.execute(s).close()
 
-        return hashes, size
+        return hashes, size, serials
 
     def node_remove(self, node):
         """Remove the node specified.
index 3987335..ed4c6b0 100644 (file)
@@ -286,7 +286,12 @@ class Node(DBWorker):
              "and cluster = ? "
              "and mtime <= ?")
         execute(q, args)
-        hashes = [r[0] for r in self.fetchall()]
+        hashes = []
+        serials = []
+        for r in self.fetchall():
+            hashes += [r[0]]
+            serials += [r[1]]
+        
         q = ("delete from versions "
              "where node in (select node "
              "from nodes "
@@ -301,7 +306,7 @@ class Node(DBWorker):
              "where node = n.node) = 0 "
              "and parent = ?)")
         execute(q, (parent,))
-        return hashes, size
+        return hashes, size, serials
 
     def node_purge(self, node, before=inf, cluster=0):
         """Delete all versions with the specified
@@ -328,7 +333,12 @@ class Node(DBWorker):
              "and cluster = ? "
              "and mtime <= ?")
         execute(q, args)
-        hashes = [r[0] for r in self.fetchall()]
+        hashes = []
+        serials = []
+        for r in self.fetchall():
+            hashes += [r[0]]
+            serials += [r[1]]
+        
         q = ("delete from versions "
              "where node = ? "
              "and cluster = ? "
@@ -341,7 +351,7 @@ class Node(DBWorker):
              "where node = n.node) = 0 "
              "and node = ?)")
         execute(q, (node,))
-        return hashes, size
+        return hashes, size, serials
 
     def node_remove(self, node):
         """Remove the node specified.
index 679603f..28495dc 100644 (file)
@@ -483,26 +483,29 @@ class ModularBackend(BaseBackend):
         path, node = self._lookup_container(account, container)
 
         if until is not None:
-            hashes, size = self.node.node_purge_children(
+            hashes, size, serials = self.node.node_purge_children(
                 node, until, CLUSTER_HISTORY)
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge_children(node, until, CLUSTER_DELETED)
-            self._report_size_change(user, account, -size, {'action':
-                                     'container purge', 'path': path})
+            self._report_size_change(user, account, -size,
+                                                        {'action':'container purge', 'path': path,
+                                                         'versions': serials})
             return
 
         if not delimiter:
             if self._get_statistics(node)[0] > 0:
                 raise ContainerNotEmpty('Container is not empty')
-            hashes, size = self.node.node_purge_children(
+            hashes, size, serials = self.node.node_purge_children(
                 node, inf, CLUSTER_HISTORY)
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge_children(node, inf, CLUSTER_DELETED)
             self.node.node_remove(node)
-            self._report_size_change(user, account, -size, {'action':
-                                     'container delete', 'path': path})
+            self._report_size_change(user, account, -size,
+                                                        {'action': 'container delete',
+                                                         'path': path,
+                                                         'versions': serials})
         else:
                 # remove only contents
             src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False)
@@ -514,7 +517,9 @@ class ModularBackend(BaseBackend):
                 del_size = self._apply_versioning(
                     account, container, src_version_id)
                 if del_size:
-                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
+                    self._report_size_change(user, account, -del_size,
+                                                                {'action': 'object delete',
+                                                                 'path': path, 'versions': [dest_version_id]})
                 self._report_object_change(
                     user, account, path, details={'action': 'object delete'})
                 paths.append(path)
@@ -804,8 +809,9 @@ class ModularBackend(BaseBackend):
                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
                 # This must be executed in a transaction, so the version is never created if it fails.
                 raise QuotaError
-        self._report_size_change(user, account, size_delta, {
-                                 'action': 'object update', 'path': path})
+        self._report_size_change(user, account, size_delta,
+                                                        {'action': 'object update', 'path': path,
+                                                         'versions': [dest_version_id]})
 
         if permissions is not None:
             self.permissions.access_set(path, permissions)
@@ -919,12 +925,15 @@ class ModularBackend(BaseBackend):
                 return
             hashes = []
             size = 0
-            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
+            serials = []
+            h, s, v = self.node.node_purge(node, until, CLUSTER_NORMAL)
             hashes += h
             size += s
-            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
+            serials += v
+            h, s, v = self.node.node_purge(node, until, CLUSTER_HISTORY)
             hashes += h
             size += s
+            serials += v
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge(node, until, CLUSTER_DELETED)
@@ -932,16 +941,18 @@ class ModularBackend(BaseBackend):
                 props = self._get_version(node)
             except NameError:
                 self.permissions.access_clear(path)
-            self._report_size_change(user, account, -size, {
-                                     'action': 'object purge', 'path': path})
+            self._report_size_change(user, account, -size,
+                                                       {'action': 'object purge', 'path': path,
+                                                        'versions': serials})
             return
 
         path, node = self._lookup_object(account, container, name)
         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED)
         del_size = self._apply_versioning(account, container, src_version_id)
         if del_size:
-            self._report_size_change(user, account, -del_size, {
-                                     'action': 'object delete', 'path': path})
+            self._report_size_change(user, account, -del_size,
+                                                        {'action': 'object delete', 'path': path,
+                                                         'versions': [dest_version_id]})
         self._report_object_change(
             user, account, path, details={'action': 'object delete'})
         self.permissions.access_clear(path)
@@ -957,7 +968,10 @@ class ModularBackend(BaseBackend):
                 del_size = self._apply_versioning(
                     account, container, src_version_id)
                 if del_size:
-                    self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path': path})
+                    self._report_size_change(user, account, -del_size,
+                                                                {'action': 'object delete',
+                                                                 'path': path,
+                                                                 'versions': [dest_version_id]})
                 self._report_object_change(
                     user, account, path, details={'action': 'object delete'})
                 paths.append(path)
@@ -1215,11 +1229,8 @@ class ModularBackend(BaseBackend):
         logger.debug(
             "_report_size_change: %s %s %s %s", user, account, size, details)
         self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), 
-                                                 account,
-                                                 QUEUE_INSTANCE_ID,
-                                                 'diskspace',
-                                                 float(size),
-                                                 details))
+                                                 account, QUEUE_INSTANCE_ID, 'diskspace',
+                                                 float(size), details))
 
     def _report_object_change(self, user, account, path, details={}):
         details.update({'user': user})
index b1056ae..ddb8518 100755 (executable)
@@ -59,16 +59,10 @@ DATE_FORMATS = ["%a %b %d %H:%M:%S %Y",
                 "%A, %d-%b-%y %H:%M:%S GMT",
                 "%a, %d %b %Y %H:%M:%S GMT"]
 
-OTHER_ACCOUNTS = {
-    '0001': 'verigak',
-    '0002': 'chazapis',
-    '0003': 'gtsouk',
-    '0004': 'papagian',
-    '0005': 'louridas',
-    '0006': 'chstath',
-    '0007': 'pkanavos',
-    '0008': 'mvasilak',
-    '0009': 'διογένης'}
+from pithos.api.settings import AUTHENTICATION_USERS
+AUTHENTICATION_USERS = AUTHENTICATION_USERS or {}
+OTHER_ACCOUNTS = AUTHENTICATION_USERS.copy()
+OTHER_ACCOUNTS.pop(get_auth())
 
 class BaseTestCase(unittest.TestCase):
     #TODO unauthorized request
@@ -123,7 +117,7 @@ class BaseTestCase(unittest.TestCase):
 
     def _clean_account(self):
         for c in self.client.list_containers():
-            if c not in self.initial_containers:
+#             if c not in self.initial_containers:
                 self.client.delete_container(c, delimiter='/')
                 self.client.delete_container(c)
     
@@ -298,7 +292,7 @@ class AccountHead(BaseTestCase):
 
     def test_get_account_meta_until(self):
         t = datetime.datetime.utcnow()
-        past = t - datetime.timedelta(minutes=-15)
+        past = t - datetime.timedelta(minutes=15)
         past = int(_time.mktime(past.timetuple()))
 
         meta = {'premium':True}
@@ -913,13 +907,12 @@ class ObjectGet(BaseTestCase):
         v_meta = self.client.retrieve_object_metadata(c, o['name'],
                                                       restricted=True,
                                                       version=v)
-        for k in meta.keys():
-            self.assertTrue(k not in v_meta)
-
+        (self.assertTrue(k not in v_meta) for k in meta.keys())
+        
         #update obejct
         data = get_random_data()
         self.client.update_object(c, o['name'], StringIO(data))
-
+        
         aa = self.client.retrieve_object_versionlist(c, o['name'])['versions']
         self.assert_versionlist_structure(aa)
         self.assertEqual(len(a)+1, len(aa))
@@ -1862,18 +1855,20 @@ class ListSharing(BaseTestCase):
         for i in range(2):
             self.upload_random_data('c1', 'o%s' %i)
         accounts = OTHER_ACCOUNTS.copy()
-        self.o1_sharing_with = accounts.popitem()
-        self.o1_sharing = [self.o1_sharing_with[1]]
-        self.client.share_object('c1', 'o1', self.o1_sharing, read=True)
-
+        self.o1_sharing = accounts.popitem()
+        self.client.share_object('c1', 'o1', (self.o1_sharing[1],), read=True)
+        
         l = []
-        for i in range(2):
+        for i in range(len(OTHER_ACCOUNTS) - 1):
             l.append(accounts.popitem())
-
+    
+    def tearDown(self):
+        pass
+    
     def test_list_other_shared(self):
         self.other = Pithos_Client(get_url(),
-                              self.o1_sharing_with[0],
-                              self.o1_sharing_with[1])
+                              self.o1_sharing[0],
+                              self.o1_sharing[1])
         self.assertTrue(get_user() in self.other.list_shared_by_others())
 
     def test_list_my_shared(self):