Send size deltas to the queue.
authorAntony Chazapis <chazapis@gmail.com>
Mon, 13 Feb 2012 12:10:46 +0000 (14:10 +0200)
committerAntony Chazapis <chazapis@gmail.com>
Mon, 13 Feb 2012 12:10:46 +0000 (14:10 +0200)
Refs #1688

pithos/backends/lib/sqlalchemy/node.py
pithos/backends/lib/sqlite/node.py
pithos/backends/modular.py
pithos/lib/queue.py

index e977057..f7e75d2 100644 (file)
@@ -267,7 +267,7 @@ class Node(DBWorker):
     def node_purge_children(self, parent, before=inf, cluster=0):
         """Delete all versions with the specified
            parent and cluster, and return
-           the hashes of versions deleted.
+           the hashes and size of versions deleted.
            Clears out nodes with no remaining versions.
         """
         #update statistics
@@ -284,7 +284,7 @@ class Node(DBWorker):
         row = r.fetchone()
         r.close()
         if not row:
-            return ()
+            return (), 0
         nr, size = row[0], -row[1] if row[1] else 0
         mtime = time()
         self.statistics_update(parent, -nr, size, mtime, cluster)
@@ -312,12 +312,12 @@ class Node(DBWorker):
         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
         self.conn.execute(s).close()
         
-        return hashes
+        return hashes, size
     
     def node_purge(self, node, before=inf, cluster=0):
         """Delete all versions with the specified
            node and cluster, and return
-           the hashes of versions deleted.
+           the hashes and size of versions deleted.
            Clears out the node if it has no remaining versions.
         """
         
@@ -334,7 +334,7 @@ class Node(DBWorker):
         nr, size = row[0], row[1]
         r.close()
         if not nr:
-            return ()
+            return (), 0
         mtime = time()
         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
         
@@ -360,7 +360,7 @@ class Node(DBWorker):
         s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
         self.conn.execute(s).close()
         
-        return hashes
+        return hashes, size
     
     def node_remove(self, node):
         """Remove the node specified.
@@ -644,7 +644,7 @@ class Node(DBWorker):
         
         s = self.versions.delete().where(self.versions.c.serial == serial)
         self.conn.execute(s).close()
-        return hash
+        return hash, size
     
     def attribute_get(self, serial, domain, keys=()):
         """Return a list of (key, value) pairs of the version specified by serial.
index 752c3c0..aa5cb4d 100644 (file)
@@ -235,7 +235,7 @@ class Node(DBWorker):
     def node_purge_children(self, parent, before=inf, cluster=0):
         """Delete all versions with the specified
            parent and cluster, and return
-           the hashes of versions deleted.
+           the hashes and size of versions deleted.
            Clears out nodes with no remaining versions.
         """
         
@@ -250,7 +250,7 @@ class Node(DBWorker):
         execute(q, args)
         nr, size = self.fetchone()
         if not nr:
-            return ()
+            return (), 0
         mtime = time()
         self.statistics_update(parent, -nr, -size, mtime, cluster)
         self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
@@ -277,12 +277,12 @@ class Node(DBWorker):
                                    "where node = n.node) = 0 "
                             "and parent = ?)")
         execute(q, (parent,))
-        return hashes
+        return hashes, size
     
     def node_purge(self, node, before=inf, cluster=0):
         """Delete all versions with the specified
            node and cluster, and return
-           the hashes of versions deleted.
+           the hashes and size of versions deleted.
            Clears out the node if it has no remaining versions.
         """
         
@@ -295,7 +295,7 @@ class Node(DBWorker):
         execute(q, args)
         nr, size = self.fetchone()
         if not nr:
-            return ()
+            return (), 0
         mtime = time()
         self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
         
@@ -317,7 +317,7 @@ class Node(DBWorker):
                                    "where node = n.node) = 0 "
                             "and node = ?)")
         execute(q, (node,))
-        return hashes
+        return hashes, size
     
     def node_remove(self, node):
         """Remove the node specified.
@@ -549,7 +549,7 @@ class Node(DBWorker):
         
         q = "delete from versions where serial = ?"
         self.execute(q, (serial,))
-        return hash
+        return hash, size
     
     def attribute_get(self, serial, domain, keys=()):
         """Return a list of (key, value) pairs of the version specified by serial.
index d41ded5..961d258 100644 (file)
@@ -385,21 +385,21 @@ class ModularBackend(BaseBackend):
         path, node = self._lookup_container(account, container)
         
         if until is not None:
-            hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
+            hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge_children(node, until, CLUSTER_DELETED)
-            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
+            self._report_size_change(user, account, -size, {'action': 'container purge'})
             return
         
         if self._get_statistics(node)[0] > 0:
             raise IndexError('Container is not empty')
-        hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
+        hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
         for h in hashes:
             self.store.map_delete(h)
         self.node.node_purge_children(node, inf, CLUSTER_DELETED)
         self.node.node_remove(node)
-        self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
+        self._report_size_change(user, account, -size, {'action': 'container delete'})
     
     @backend_method
     def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
@@ -552,11 +552,8 @@ class ModularBackend(BaseBackend):
         pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
         
         # Check quota.
-        versioning = self._get_policy(container_node)['versioning']
-        if versioning != 'auto':
-            size_delta = size - 0 # TODO: Get previous size.
-        else:
-            size_delta = size
+        del_size = self._apply_versioning(account, container, pre_version_id)
+        size_delta = size - del_size
         if size_delta > 0:
             account_quota = long(self._get_policy(account_node)['quota'])
             container_quota = long(self._get_policy(container_node)['quota'])
@@ -564,10 +561,10 @@ class ModularBackend(BaseBackend):
                (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
                 # This must be executed in a transaction, so the version is never created if it fails.
                 raise QuotaError
+        self._report_size_change(user, account, size_delta, {'action': 'object update'})
         
         if permissions is not None:
             self.permissions.access_set(path, permissions)
-        self._apply_versioning(account, container, pre_version_id)
         return pre_version_id, dest_version_id
     
     @backend_method
@@ -589,7 +586,6 @@ class ModularBackend(BaseBackend):
         pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
         self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
         self.store.map_put(hash, map)
-        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
         return dest_version_id
     
     def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
@@ -612,7 +608,6 @@ class ModularBackend(BaseBackend):
         
         logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
-        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
         return dest_version_id
     
     @backend_method
@@ -625,7 +620,6 @@ class ModularBackend(BaseBackend):
         dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
         if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
             self._delete_object(user, src_account, src_container, src_name)
-        self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
         return dest_version_id
     
     def _delete_object(self, user, account, container, name, until=None):
@@ -637,8 +631,14 @@ class ModularBackend(BaseBackend):
             node = self.node.node_lookup(path)
             if node is None:
                 return
-            hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
-            hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
+            hashes = []
+            size = 0
+            h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
+            hashes += h
+            size += s
+            h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
+            hashes += h
+            size += s
             for h in hashes:
                 self.store.map_delete(h)
             self.node.node_purge(node, until, CLUSTER_DELETED)
@@ -646,12 +646,14 @@ class ModularBackend(BaseBackend):
                 props = self._get_version(node)
             except NameError:
                 self.permissions.access_clear(path)
-            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
+            self._report_size_change(user, account, -size, {'action': 'object purge'})
             return
         
         path, node = self._lookup_object(account, container, name)
         src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
-        self._apply_versioning(account, container, src_version_id)
+        del_size = self._apply_versioning(account, container, src_version_id)
+        if del_size:
+            self._report_size_change(user, account, -del_size, {'action': 'object delete'})
         self.permissions.access_clear(path)
     
     @backend_method
@@ -873,6 +875,15 @@ class ModularBackend(BaseBackend):
         start, limit = self._list_limits([x[0] for x in objects], marker, limit)
         return objects[start:start + limit]
     
+    # Reporting functions.
+    
+    def _report_size_change(self, user, account, size, details={}):
+        logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
+        account_node = self._lookup_account(account, True)[1]
+        total = self._get_statistics(account_node)[1]
+        details.update({'user': user, 'total': total})
+        self.queue.send(account, 'diskspace', size, details)
+    
     # Policy functions.
     
     def _check_policy(self, policy):
@@ -903,14 +914,19 @@ class ModularBackend(BaseBackend):
         return policy
     
     def _apply_versioning(self, account, container, version_id):
+        """Delete the provided version if such is the policy.
+           Return size of object removed.
+        """
+        
         if version_id is None:
-            return
+            return 0
         path, node = self._lookup_container(account, container)
         versioning = self._get_policy(node)['versioning']
         if versioning != 'auto':
-            hash = self.node.version_remove(version_id)
+            hash, size = self.node.version_remove(version_id)
             self.store.map_delete(hash)
-            self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
+            return size
+        return 0
     
     # Access control functions.
     
index a1c08b8..837a74b 100755 (executable)
@@ -93,7 +93,7 @@ def queue_start(conn):
     channel.start_consuming()
 
 class Receipt(object):
-    def __init__(self, client, user, resource, value, details=None):
+    def __init__(self, client, user, resource, value, details={}):
         self.eventVersion = 1
         self.id = str(uuid.uuid4())
         self.timestamp = int(time() * 1000)
@@ -101,8 +101,7 @@ class Receipt(object):
         self.userId = user
         self.resource = resource
         self.value = value
-        if details:
-            self.details = details
+        self.details = details
     
     def format(self):
         return self.__dict__