From 813e42e545fc17b7dfdbd0b2b1d2ee8f95b8bfdd Mon Sep 17 00:00:00 2001 From: Antony Chazapis Date: Mon, 13 Feb 2012 14:10:46 +0200 Subject: [PATCH] Send size deltas to the queue. Refs #1688 --- pithos/backends/lib/sqlalchemy/node.py | 14 ++++---- pithos/backends/lib/sqlite/node.py | 14 ++++---- pithos/backends/modular.py | 56 ++++++++++++++++++++------------ pithos/lib/queue.py | 5 ++- 4 files changed, 52 insertions(+), 37 deletions(-) diff --git a/pithos/backends/lib/sqlalchemy/node.py b/pithos/backends/lib/sqlalchemy/node.py index e977057..f7e75d2 100644 --- a/pithos/backends/lib/sqlalchemy/node.py +++ b/pithos/backends/lib/sqlalchemy/node.py @@ -267,7 +267,7 @@ class Node(DBWorker): def node_purge_children(self, parent, before=inf, cluster=0): """Delete all versions with the specified parent and cluster, and return - the hashes of versions deleted. + the hashes and size of versions deleted. Clears out nodes with no remaining versions. """ #update statistics @@ -284,7 +284,7 @@ class Node(DBWorker): row = r.fetchone() r.close() if not row: - return () + return (), 0 nr, size = row[0], -row[1] if row[1] else 0 mtime = time() self.statistics_update(parent, -nr, size, mtime, cluster) @@ -312,12 +312,12 @@ class Node(DBWorker): s = self.nodes.delete().where(self.nodes.c.node.in_(nodes)) self.conn.execute(s).close() - return hashes + return hashes, size def node_purge(self, node, before=inf, cluster=0): """Delete all versions with the specified node and cluster, and return - the hashes of versions deleted. + the hashes and size of versions deleted. Clears out the node if it has no remaining versions. """ @@ -334,7 +334,7 @@ class Node(DBWorker): nr, size = row[0], row[1] r.close() if not nr: - return () + return (), 0 mtime = time() self.statistics_update_ancestors(node, -nr, -size, mtime, cluster) @@ -360,7 +360,7 @@ class Node(DBWorker): s = self.nodes.delete().where(self.nodes.c.node.in_(nodes)) self.conn.execute(s).close() - return hashes + return hashes, size def node_remove(self, node): """Remove the node specified. @@ -644,7 +644,7 @@ class Node(DBWorker): s = self.versions.delete().where(self.versions.c.serial == serial) self.conn.execute(s).close() - return hash + return hash, size def attribute_get(self, serial, domain, keys=()): """Return a list of (key, value) pairs of the version specified by serial. diff --git a/pithos/backends/lib/sqlite/node.py b/pithos/backends/lib/sqlite/node.py index 752c3c0..aa5cb4d 100644 --- a/pithos/backends/lib/sqlite/node.py +++ b/pithos/backends/lib/sqlite/node.py @@ -235,7 +235,7 @@ class Node(DBWorker): def node_purge_children(self, parent, before=inf, cluster=0): """Delete all versions with the specified parent and cluster, and return - the hashes of versions deleted. + the hashes and size of versions deleted. Clears out nodes with no remaining versions. """ @@ -250,7 +250,7 @@ class Node(DBWorker): execute(q, args) nr, size = self.fetchone() if not nr: - return () + return (), 0 mtime = time() self.statistics_update(parent, -nr, -size, mtime, cluster) self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster) @@ -277,12 +277,12 @@ class Node(DBWorker): "where node = n.node) = 0 " "and parent = ?)") execute(q, (parent,)) - return hashes + return hashes, size def node_purge(self, node, before=inf, cluster=0): """Delete all versions with the specified node and cluster, and return - the hashes of versions deleted. + the hashes and size of versions deleted. Clears out the node if it has no remaining versions. """ @@ -295,7 +295,7 @@ class Node(DBWorker): execute(q, args) nr, size = self.fetchone() if not nr: - return () + return (), 0 mtime = time() self.statistics_update_ancestors(node, -nr, -size, mtime, cluster) @@ -317,7 +317,7 @@ class Node(DBWorker): "where node = n.node) = 0 " "and node = ?)") execute(q, (node,)) - return hashes + return hashes, size def node_remove(self, node): """Remove the node specified. @@ -549,7 +549,7 @@ class Node(DBWorker): q = "delete from versions where serial = ?" self.execute(q, (serial,)) - return hash + return hash, size def attribute_get(self, serial, domain, keys=()): """Return a list of (key, value) pairs of the version specified by serial. diff --git a/pithos/backends/modular.py b/pithos/backends/modular.py index d41ded5..961d258 100644 --- a/pithos/backends/modular.py +++ b/pithos/backends/modular.py @@ -385,21 +385,21 @@ class ModularBackend(BaseBackend): path, node = self._lookup_container(account, container) if until is not None: - hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY) + hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY) for h in hashes: self.store.map_delete(h) self.node.node_purge_children(node, until, CLUSTER_DELETED) - self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0}) + self._report_size_change(user, account, -size, {'action': 'container purge'}) return if self._get_statistics(node)[0] > 0: raise IndexError('Container is not empty') - hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY) + hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY) for h in hashes: self.store.map_delete(h) self.node.node_purge_children(node, inf, CLUSTER_DELETED) self.node.node_remove(node) - self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0}) + self._report_size_change(user, account, -size, {'action': 'container delete'}) @backend_method def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None): @@ -552,11 +552,8 @@ class ModularBackend(BaseBackend): pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy) # Check quota. - versioning = self._get_policy(container_node)['versioning'] - if versioning != 'auto': - size_delta = size - 0 # TODO: Get previous size. - else: - size_delta = size + del_size = self._apply_versioning(account, container, pre_version_id) + size_delta = size - del_size if size_delta > 0: account_quota = long(self._get_policy(account_node)['quota']) container_quota = long(self._get_policy(container_node)['quota']) @@ -564,10 +561,10 @@ class ModularBackend(BaseBackend): (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota): # This must be executed in a transaction, so the version is never created if it fails. raise QuotaError + self._report_size_change(user, account, size_delta, {'action': 'object update'}) if permissions is not None: self.permissions.access_set(path, permissions) - self._apply_versioning(account, container, pre_version_id) return pre_version_id, dest_version_id @backend_method @@ -589,7 +586,6 @@ class ModularBackend(BaseBackend): pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions) self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta) self.store.map_put(hash, map) - self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0}) return dest_version_id def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False): @@ -612,7 +608,6 @@ class ModularBackend(BaseBackend): logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version) dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False) - self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0}) return dest_version_id @backend_method @@ -625,7 +620,6 @@ class ModularBackend(BaseBackend): dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True) if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name): self._delete_object(user, src_account, src_container, src_name) - self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0}) return dest_version_id def _delete_object(self, user, account, container, name, until=None): @@ -637,8 +631,14 @@ class ModularBackend(BaseBackend): node = self.node.node_lookup(path) if node is None: return - hashes = self.node.node_purge(node, until, CLUSTER_NORMAL) - hashes += self.node.node_purge(node, until, CLUSTER_HISTORY) + hashes = [] + size = 0 + h, s = self.node.node_purge(node, until, CLUSTER_NORMAL) + hashes += h + size += s + h, s = self.node.node_purge(node, until, CLUSTER_HISTORY) + hashes += h + size += s for h in hashes: self.store.map_delete(h) self.node.node_purge(node, until, CLUSTER_DELETED) @@ -646,12 +646,14 @@ class ModularBackend(BaseBackend): props = self._get_version(node) except NameError: self.permissions.access_clear(path) - self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0}) + self._report_size_change(user, account, -size, {'action': 'object purge'}) return path, node = self._lookup_object(account, container, name) src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED) - self._apply_versioning(account, container, src_version_id) + del_size = self._apply_versioning(account, container, src_version_id) + if del_size: + self._report_size_change(user, account, -del_size, {'action': 'object delete'}) self.permissions.access_clear(path) @backend_method @@ -873,6 +875,15 @@ class ModularBackend(BaseBackend): start, limit = self._list_limits([x[0] for x in objects], marker, limit) return objects[start:start + limit] + # Reporting functions. + + def _report_size_change(self, user, account, size, details={}): + logger.debug("_report_size_change: %s %s %s %s", user, account, size, details) + account_node = self._lookup_account(account, True)[1] + total = self._get_statistics(account_node)[1] + details.update({'user': user, 'total': total}) + self.queue.send(account, 'diskspace', size, details) + # Policy functions. def _check_policy(self, policy): @@ -903,14 +914,19 @@ class ModularBackend(BaseBackend): return policy def _apply_versioning(self, account, container, version_id): + """Delete the provided version if such is the policy. + Return size of object removed. + """ + if version_id is None: - return + return 0 path, node = self._lookup_container(account, container) versioning = self._get_policy(node)['versioning'] if versioning != 'auto': - hash = self.node.version_remove(version_id) + hash, size = self.node.version_remove(version_id) self.store.map_delete(hash) - self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0}) + return size + return 0 # Access control functions. diff --git a/pithos/lib/queue.py b/pithos/lib/queue.py index a1c08b8..837a74b 100755 --- a/pithos/lib/queue.py +++ b/pithos/lib/queue.py @@ -93,7 +93,7 @@ def queue_start(conn): channel.start_consuming() class Receipt(object): - def __init__(self, client, user, resource, value, details=None): + def __init__(self, client, user, resource, value, details={}): self.eventVersion = 1 self.id = str(uuid.uuid4()) self.timestamp = int(time() * 1000) @@ -101,8 +101,7 @@ class Receipt(object): self.userId = user self.resource = resource self.value = value - if details: - self.details = details + self.details = details def format(self): return self.__dict__ -- 1.7.10.4