def node_purge_children(self, parent, before=inf, cluster=0):
"""Delete all versions with the specified
parent and cluster, and return
- the hashes of versions deleted.
+ the hashes and size of versions deleted.
Clears out nodes with no remaining versions.
"""
#update statistics
row = r.fetchone()
r.close()
if not row:
- return ()
+ return (), 0
nr, size = row[0], -row[1] if row[1] else 0
mtime = time()
self.statistics_update(parent, -nr, size, mtime, cluster)
s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
self.conn.execute(s).close()
- return hashes
+ return hashes, size
def node_purge(self, node, before=inf, cluster=0):
"""Delete all versions with the specified
node and cluster, and return
- the hashes of versions deleted.
+ the hashes and size of versions deleted.
Clears out the node if it has no remaining versions.
"""
nr, size = row[0], row[1]
r.close()
if not nr:
- return ()
+ return (), 0
mtime = time()
self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
s = self.nodes.delete().where(self.nodes.c.node.in_(nodes))
self.conn.execute(s).close()
- return hashes
+ return hashes, size
def node_remove(self, node):
"""Remove the node specified.
s = self.versions.delete().where(self.versions.c.serial == serial)
self.conn.execute(s).close()
- return hash
+ return hash, size
def attribute_get(self, serial, domain, keys=()):
"""Return a list of (key, value) pairs of the version specified by serial.
def node_purge_children(self, parent, before=inf, cluster=0):
"""Delete all versions with the specified
parent and cluster, and return
- the hashes of versions deleted.
+ the hashes and size of versions deleted.
Clears out nodes with no remaining versions.
"""
execute(q, args)
nr, size = self.fetchone()
if not nr:
- return ()
+ return (), 0
mtime = time()
self.statistics_update(parent, -nr, -size, mtime, cluster)
self.statistics_update_ancestors(parent, -nr, -size, mtime, cluster)
"where node = n.node) = 0 "
"and parent = ?)")
execute(q, (parent,))
- return hashes
+ return hashes, size
def node_purge(self, node, before=inf, cluster=0):
"""Delete all versions with the specified
node and cluster, and return
- the hashes of versions deleted.
+ the hashes and size of versions deleted.
Clears out the node if it has no remaining versions.
"""
execute(q, args)
nr, size = self.fetchone()
if not nr:
- return ()
+ return (), 0
mtime = time()
self.statistics_update_ancestors(node, -nr, -size, mtime, cluster)
"where node = n.node) = 0 "
"and node = ?)")
execute(q, (node,))
- return hashes
+ return hashes, size
def node_remove(self, node):
"""Remove the node specified.
q = "delete from versions where serial = ?"
self.execute(q, (serial,))
- return hash
+ return hash, size
def attribute_get(self, serial, domain, keys=()):
"""Return a list of (key, value) pairs of the version specified by serial.
path, node = self._lookup_container(account, container)
if until is not None:
- hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
+ hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, until, CLUSTER_DELETED)
- self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
+ self._report_size_change(user, account, -size, {'action': 'container purge'})
return
if self._get_statistics(node)[0] > 0:
raise IndexError('Container is not empty')
- hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
+ hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
for h in hashes:
self.store.map_delete(h)
self.node.node_purge_children(node, inf, CLUSTER_DELETED)
self.node.node_remove(node)
- self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
+ self._report_size_change(user, account, -size, {'action': 'container delete'})
@backend_method
def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None):
pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy)
# Check quota.
- versioning = self._get_policy(container_node)['versioning']
- if versioning != 'auto':
- size_delta = size - 0 # TODO: Get previous size.
- else:
- size_delta = size
+ del_size = self._apply_versioning(account, container, pre_version_id)
+ size_delta = size - del_size
if size_delta > 0:
account_quota = long(self._get_policy(account_node)['quota'])
container_quota = long(self._get_policy(container_node)['quota'])
(container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
# This must be executed in a transaction, so the version is never created if it fails.
raise QuotaError
+ self._report_size_change(user, account, size_delta, {'action': 'object update'})
if permissions is not None:
self.permissions.access_set(path, permissions)
- self._apply_versioning(account, container, pre_version_id)
return pre_version_id, dest_version_id
@backend_method
pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta)
self.store.map_put(hash, map)
- self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
return dest_version_id
def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False):
logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False)
- self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
return dest_version_id
@backend_method
dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True)
if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
self._delete_object(user, src_account, src_container, src_name)
- self.queue.send(user, 'diskspace', 0, {'action': 'add', 'version': dest_version_id, 'total': 0})
return dest_version_id
def _delete_object(self, user, account, container, name, until=None):
node = self.node.node_lookup(path)
if node is None:
return
- hashes = self.node.node_purge(node, until, CLUSTER_NORMAL)
- hashes += self.node.node_purge(node, until, CLUSTER_HISTORY)
+ hashes = []
+ size = 0
+ h, s = self.node.node_purge(node, until, CLUSTER_NORMAL)
+ hashes += h
+ size += s
+ h, s = self.node.node_purge(node, until, CLUSTER_HISTORY)
+ hashes += h
+ size += s
for h in hashes:
self.store.map_delete(h)
self.node.node_purge(node, until, CLUSTER_DELETED)
props = self._get_version(node)
except NameError:
self.permissions.access_clear(path)
- self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
+ self._report_size_change(user, account, -size, {'action': 'object purge'})
return
path, node = self._lookup_object(account, container, name)
src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED)
- self._apply_versioning(account, container, src_version_id)
+ del_size = self._apply_versioning(account, container, src_version_id)
+ if del_size:
+ self._report_size_change(user, account, -del_size, {'action': 'object delete'})
self.permissions.access_clear(path)
@backend_method
start, limit = self._list_limits([x[0] for x in objects], marker, limit)
return objects[start:start + limit]
+ # Reporting functions.
+
+ def _report_size_change(self, user, account, size, details={}):
+ logger.debug("_report_size_change: %s %s %s %s", user, account, size, details)
+ account_node = self._lookup_account(account, True)[1]
+ total = self._get_statistics(account_node)[1]
+ details.update({'user': user, 'total': total})
+ self.queue.send(account, 'diskspace', size, details)
+
# Policy functions.
def _check_policy(self, policy):
return policy
def _apply_versioning(self, account, container, version_id):
+ """Delete the provided version if such is the policy.
+ Return size of object removed.
+ """
+
if version_id is None:
- return
+ return 0
path, node = self._lookup_container(account, container)
versioning = self._get_policy(node)['versioning']
if versioning != 'auto':
- hash = self.node.version_remove(version_id)
+ hash, size = self.node.version_remove(version_id)
self.store.map_delete(hash)
- self.queue.send(user, 'diskspace', 0, {'action': 'delete', 'total': 0})
+ return size
+ return 0
# Access control functions.