X-Git-Url: https://code.grnet.gr/git/pithos/blobdiff_plain/39ef6f4143cb101fbf31069de3b402e9e1e0d82e..778318d1090e7002194865545e9e4af8b9443ce2:/snf-pithos-backend/pithos/backends/modular.py diff --git a/snf-pithos-backend/pithos/backends/modular.py b/snf-pithos-backend/pithos/backends/modular.py index 6f6b345..967eb02 100644 --- a/snf-pithos-backend/pithos/backends/modular.py +++ b/snf-pithos-backend/pithos/backends/modular.py @@ -39,7 +39,8 @@ import logging import hashlib import binascii -from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend +from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \ + AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists # Stripped-down version of the HashMap class found in tools. class HashMap(list): @@ -74,11 +75,13 @@ DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy' DEFAULT_DB_CONNECTION = 'sqlite:///backend.db' DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler' DEFAULT_BLOCK_PATH = 'data/' +DEFAULT_BLOCK_UMASK = 0o022 #DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq' #DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos' -QUEUE_MESSAGE_KEY = 'pithos' +QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s' QUEUE_CLIENT_ID = 'pithos' +QUEUE_INSTANCE_ID = '1' ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3) @@ -103,9 +106,9 @@ def backend_method(func=None, autocommit=1): try: self.messages = [] ret = func(self, *args, **kw) - self.wrapper.commit() for m in self.messages: self.queue.send(*m) + self.wrapper.commit() return ret except: self.wrapper.rollback() @@ -120,12 +123,13 @@ class ModularBackend(BaseBackend): """ def __init__(self, db_module=None, db_connection=None, - block_module=None, block_path=None, + block_module=None, block_path=None, block_umask=None, queue_module=None, queue_connection=None): db_module = db_module or DEFAULT_DB_MODULE db_connection = db_connection or DEFAULT_DB_CONNECTION block_module = block_module or DEFAULT_BLOCK_MODULE block_path = block_path or DEFAULT_BLOCK_PATH + block_umask = block_umask or DEFAULT_BLOCK_UMASK #queue_module = queue_module or DEFAULT_QUEUE_MODULE #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION @@ -151,13 +155,13 @@ class ModularBackend(BaseBackend): self.block_module = load_module(block_module) params = {'path': block_path, 'block_size': self.block_size, - 'hash_algorithm': self.hash_algorithm} + 'hash_algorithm': self.hash_algorithm, + 'umask': block_umask} self.store = self.block_module.Store(**params) if queue_module and queue_connection: self.queue_module = load_module(queue_module) params = {'exchange': queue_connection, - 'message_key': QUEUE_MESSAGE_KEY, 'client_id': QUEUE_CLIENT_ID} self.queue = self.queue_module.Queue(**params) else: @@ -187,7 +191,7 @@ class ModularBackend(BaseBackend): def get_account_meta(self, user, account, domain, until=None, include_user_defined=True): """Return a dictionary with the account metadata for the domain.""" - logger.debug("get_account_meta: %s %s %s", account, domain, until) + logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until) path, node = self._lookup_account(account, user == account) if user != account: if until or node is None or account not in self._allowed_accounts(user): @@ -222,7 +226,7 @@ class ModularBackend(BaseBackend): def update_account_meta(self, user, account, domain, meta, replace=False): """Update the metadata associated with the account for the domain.""" - logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace) + logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace) if user != account: raise NotAllowedError path, node = self._lookup_account(account, True) @@ -232,7 +236,7 @@ class ModularBackend(BaseBackend): def get_account_groups(self, user, account): """Return a dictionary with the user groups defined for this account.""" - logger.debug("get_account_groups: %s", account) + logger.debug("get_account_groups: %s %s", user, account) if user != account: if account not in self._allowed_accounts(user): raise NotAllowedError @@ -244,7 +248,7 @@ class ModularBackend(BaseBackend): def update_account_groups(self, user, account, groups, replace=False): """Update the groups associated with the account.""" - logger.debug("update_account_groups: %s %s %s", account, groups, replace) + logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace) if user != account: raise NotAllowedError self._lookup_account(account, True) @@ -261,7 +265,7 @@ class ModularBackend(BaseBackend): def get_account_policy(self, user, account): """Return a dictionary with the account policy.""" - logger.debug("get_account_policy: %s", account) + logger.debug("get_account_policy: %s %s", user, account) if user != account: if account not in self._allowed_accounts(user): raise NotAllowedError @@ -273,7 +277,7 @@ class ModularBackend(BaseBackend): def update_account_policy(self, user, account, policy, replace=False): """Update the policy associated with the account.""" - logger.debug("update_account_policy: %s %s %s", account, policy, replace) + logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace) if user != account: raise NotAllowedError path, node = self._lookup_account(account, True) @@ -284,12 +288,12 @@ class ModularBackend(BaseBackend): def put_account(self, user, account, policy={}): """Create a new account with the given name.""" - logger.debug("put_account: %s %s", account, policy) + logger.debug("put_account: %s %s %s", user, account, policy) if user != account: raise NotAllowedError node = self.node.node_lookup(account) if node is not None: - raise NameError('Account already exists') + raise AccountExists('Account already exists') if policy: self._check_policy(policy) node = self._put_path(user, self.ROOTNODE, account) @@ -299,40 +303,46 @@ class ModularBackend(BaseBackend): def delete_account(self, user, account): """Delete the account with the given name.""" - logger.debug("delete_account: %s", account) + logger.debug("delete_account: %s %s", user, account) if user != account: raise NotAllowedError node = self.node.node_lookup(account) if node is None: return if not self.node.node_remove(node): - raise IndexError('Account is not empty') + raise AccountNotEmpty('Account is not empty') self.permissions.group_destroy(account) @backend_method - def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None): + def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False): """Return a list of containers existing under an account.""" - logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until) + logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public) if user != account: if until or account not in self._allowed_accounts(user): raise NotAllowedError allowed = self._allowed_containers(user, account) start, limit = self._list_limits(allowed, marker, limit) return allowed[start:start + limit] - if shared: - allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)] - allowed = list(set(allowed)) + if shared or public: + allowed = set() + if shared: + allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]) + if public: + allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)]) + allowed = sorted(allowed) start, limit = self._list_limits(allowed, marker, limit) return allowed[start:start + limit] node = self.node.node_lookup(account) - return [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)] + containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)] + start, limit = self._list_limits([x[0] for x in containers], marker, limit) + return containers[start:start + limit] @backend_method def list_container_meta(self, user, account, container, domain, until=None): """Return a list with all the container's object meta keys for the domain.""" - logger.debug("list_container_meta: %s %s %s %s", account, container, domain, until) + logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until) allowed = [] if user != account: if until: @@ -349,7 +359,7 @@ class ModularBackend(BaseBackend): def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True): """Return a dictionary with the container metadata for the domain.""" - logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until) + logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until) if user != account: if until or container not in self._allowed_containers(user, account): raise NotAllowedError @@ -380,7 +390,7 @@ class ModularBackend(BaseBackend): def update_container_meta(self, user, account, container, domain, meta, replace=False): """Update the metadata associated with the container for the domain.""" - logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace) + logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace) if user != account: raise NotAllowedError path, node = self._lookup_container(account, container) @@ -394,7 +404,7 @@ class ModularBackend(BaseBackend): def get_container_policy(self, user, account, container): """Return a dictionary with the container policy.""" - logger.debug("get_container_policy: %s %s", account, container) + logger.debug("get_container_policy: %s %s %s", user, account, container) if user != account: if container not in self._allowed_containers(user, account): raise NotAllowedError @@ -406,7 +416,7 @@ class ModularBackend(BaseBackend): def update_container_policy(self, user, account, container, policy, replace=False): """Update the policy associated with the container.""" - logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace) + logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace) if user != account: raise NotAllowedError path, node = self._lookup_container(account, container) @@ -417,7 +427,7 @@ class ModularBackend(BaseBackend): def put_container(self, user, account, container, policy={}): """Create a new container with the given name.""" - logger.debug("put_container: %s %s %s", account, container, policy) + logger.debug("put_container: %s %s %s %s", user, account, container, policy) if user != account: raise NotAllowedError try: @@ -425,7 +435,7 @@ class ModularBackend(BaseBackend): except NameError: pass else: - raise NameError('Container already exists') + raise ContainerExists('Container already exists') if policy: self._check_policy(policy) path = '/'.join((account, container)) @@ -433,10 +443,10 @@ class ModularBackend(BaseBackend): self._put_policy(node, policy, True) @backend_method - def delete_container(self, user, account, container, until=None): + def delete_container(self, user, account, container, until=None, prefix='', delimiter=None): """Delete/purge the container with the given name.""" - logger.debug("delete_container: %s %s %s", account, container, until) + logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter) if user != account: raise NotAllowedError path, node = self._lookup_container(account, container) @@ -446,29 +456,88 @@ class ModularBackend(BaseBackend): for h in hashes: self.store.map_delete(h) self.node.node_purge_children(node, until, CLUSTER_DELETED) - self._report_size_change(user, account, -size, {'action': 'container purge'}) + self._report_size_change(user, account, -size, {'action': 'container purge', 'path':path}) return - if self._get_statistics(node)[0] > 0: - raise IndexError('Container is not empty') - hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY) - for h in hashes: - self.store.map_delete(h) - self.node.node_purge_children(node, inf, CLUSTER_DELETED) - self.node.node_remove(node) - self._report_size_change(user, account, -size, {'action': 'container delete'}) - - def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props): + if not delimiter: + if self._get_statistics(node)[0] > 0: + raise ContainerNotEmpty('Container is not empty') + hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY) + for h in hashes: + self.store.map_delete(h) + self.node.node_purge_children(node, inf, CLUSTER_DELETED) + self.node.node_remove(node) + self._report_size_change(user, account, -size, {'action': 'container delete', 'path':path}) + else: + # remove only contents + src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False) + paths = [] + for t in src_names: + path = '/'.join((account, container, t[0])) + node = t[2] + src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED) + del_size = self._apply_versioning(account, container, src_version_id) + if del_size: + self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path}) + self._report_object_change(user, account, path, details={'action': 'object delete'}) + paths.append(path) + self.permissions.access_clear_bulk(paths) + + def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public): if user != account and until: raise NotAllowedError - allowed = self._list_object_permissions(user, account, container, prefix, shared) + if shared and public: + # get shared first + shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False) + objects = set() + if shared: + path, node = self._lookup_container(account, container) + shared = self._get_formatted_paths(shared) + objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props)) + + # get public + objects |= set(self._list_public_object_properties(user, account, container, prefix, all_props)) + objects = list(objects) + + objects.sort(key=lambda x: x[0]) + start, limit = self._list_limits([x[0] for x in objects], marker, limit) + return objects[start:start + limit] + elif public: + objects = self._list_public_object_properties(user, account, container, prefix, all_props) + start, limit = self._list_limits([x[0] for x in objects], marker, limit) + return objects[start:start + limit] + + allowed = self._list_object_permissions(user, account, container, prefix, shared, public) if shared and not allowed: return [] path, node = self._lookup_container(account, container) allowed = self._get_formatted_paths(allowed) - return self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props) + objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props) + start, limit = self._list_limits([x[0] for x in objects], marker, limit) + return objects[start:start + limit] - def _list_object_permissions(self, user, account, container, prefix, shared): + def _list_public_object_properties(self, user, account, container, prefix, all_props): + public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True) + paths, nodes = self._lookup_objects(public) + path = '/'.join((account, container)) + cont_prefix = path + '/' + paths = [x[len(cont_prefix):] for x in paths] + props = self.node.version_lookup_bulk(nodes, all_props=all_props) + objects = [(path,) + props for path, props in zip(paths, props)] + return objects + + def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public): + objects = [] + while True: + marker = objects[-1] if objects else None + limit = 10000 + l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public) + objects.extend(l) + if not l or len(l) < limit: + break + return objects + + def _list_object_permissions(self, user, account, container, prefix, shared, public): allowed = [] path = '/'.join((account, container, prefix)).rstrip('/') if user != account: @@ -476,25 +545,29 @@ class ModularBackend(BaseBackend): if not allowed: raise NotAllowedError else: + allowed = set() if shared: - allowed = self.permissions.access_list_shared(path) - if not allowed: - return [] + allowed |= set(self.permissions.access_list_shared(path)) + if public: + allowed |= set([x[0] for x in self.permissions.public_list(path)]) + allowed = sorted(allowed) + if not allowed: + return [] return allowed @backend_method - def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None): + def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False): """Return a list of object (name, version_id) tuples existing under a container.""" - logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range) - return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False) + logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public) + return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public) @backend_method - def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None): + def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False): """Return a list of object metadata dicts existing under a container.""" - logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range) - props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True) + logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public) + props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public) objects = [] for p in props: if len(p) == 2: @@ -516,14 +589,14 @@ class ModularBackend(BaseBackend): def list_object_permissions(self, user, account, container, prefix=''): """Return a list of paths that enforce permissions under a container.""" - logger.debug("list_object_permissions: %s %s %s", account, container, prefix) - return self._list_object_permissions(user, account, container, prefix, True) + logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix) + return self._list_object_permissions(user, account, container, prefix, True, False) @backend_method def list_object_public(self, user, account, container, prefix=''): """Return a dict mapping paths to public ids for objects that are public under a container.""" - logger.debug("list_object_public: %s %s %s", account, container, prefix) + logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix) public = {} for path, p in self.permissions.public_list('/'.join((account, container, prefix))): public[path] = p + ULTIMATE_ANSWER @@ -533,7 +606,7 @@ class ModularBackend(BaseBackend): def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True): """Return a dictionary with the object metadata for the domain.""" - logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version) + logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version) self._can_read(user, account, container, name) path, node = self._lookup_object(account, container, name) props = self._get_version(node, version) @@ -545,7 +618,7 @@ class ModularBackend(BaseBackend): except NameError: # Object may be deleted. del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED) if del_props is None: - raise NameError('Object does not exist') + raise ItemNotExists('Object does not exist') modified = del_props[self.MTIME] meta = {} @@ -567,7 +640,7 @@ class ModularBackend(BaseBackend): def update_object_meta(self, user, account, container, name, domain, meta, replace=False): """Update the metadata associated with the object for the domain and return the new version.""" - logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace) + logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace) self._can_write(user, account, container, name) path, node = self._lookup_object(account, container, name) src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace) @@ -580,7 +653,7 @@ class ModularBackend(BaseBackend): from which the object gets its permissions from, along with a dictionary containing the permissions.""" - logger.debug("get_object_permissions: %s %s %s", account, container, name) + logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name) allowed = 'write' permissions_path = self._get_permissions_path(account, container, name) if user != account: @@ -597,18 +670,19 @@ class ModularBackend(BaseBackend): def update_object_permissions(self, user, account, container, name, permissions): """Update the permissions associated with the object.""" - logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions) + logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions) if user != account: raise NotAllowedError path = self._lookup_object(account, container, name)[0] self._check_permissions(path, permissions) self.permissions.access_set(path, permissions) + self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)}) @backend_method def get_object_public(self, user, account, container, name): """Return the public id of the object if applicable.""" - logger.debug("get_object_public: %s %s %s", account, container, name) + logger.debug("get_object_public: %s %s %s %s", user, account, container, name) self._can_read(user, account, container, name) path = self._lookup_object(account, container, name)[0] p = self.permissions.public_get(path) @@ -620,7 +694,7 @@ class ModularBackend(BaseBackend): def update_object_public(self, user, account, container, name, public): """Update the public status of the object.""" - logger.debug("update_object_public: %s %s %s %s", account, container, name, public) + logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public) self._can_write(user, account, container, name) path = self._lookup_object(account, container, name)[0] if not public: @@ -632,7 +706,7 @@ class ModularBackend(BaseBackend): def get_object_hashmap(self, user, account, container, name, version=None): """Return the object's size and a list with partial hashes.""" - logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version) + logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version) self._can_read(user, account, container, name) path, node = self._lookup_object(account, container, name) props = self._get_version(node, version) @@ -667,10 +741,11 @@ class ModularBackend(BaseBackend): (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota): # This must be executed in a transaction, so the version is never created if it fails. raise QuotaError - self._report_size_change(user, account, size_delta, {'action': 'object update'}) + self._report_size_change(user, account, size_delta, {'action': 'object update', 'path':path}) if permissions is not None: self.permissions.access_set(path, permissions) + self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)}) self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'}) return dest_version_id @@ -679,7 +754,7 @@ class ModularBackend(BaseBackend): def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None): """Create/update an object with the specified size and partial hashes.""" - logger.debug("update_object_hashmap: %s %s %s %s %s %s %s", account, container, name, size, type, hashmap, checksum) + logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum) if size == 0: # No such thing as an empty hashmap. hashmap = [self.put_block('')] map = HashMap(self.block_size, self.hash_algorithm) @@ -699,7 +774,7 @@ class ModularBackend(BaseBackend): def update_object_checksum(self, user, account, container, name, version, checksum): """Update an object's checksum.""" - logger.debug("update_object_checksum: %s %s %s %s %s", account, container, name, version, checksum) + logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum) # Update objects with greater version and same hashmap and size (fix metadata updates). self._can_write(user, account, container, name) path, node = self._lookup_object(account, container, name) @@ -709,7 +784,8 @@ class ModularBackend(BaseBackend): if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]: self.node.version_put_property(x[self.SERIAL], 'checksum', checksum) - def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False): + def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None): + dest_version_ids = [] self._can_read(user, src_account, src_container, src_name) path, node = self._lookup_object(src_account, src_container, src_name) # TODO: Will do another fetch of the properties in duplicate version... @@ -717,32 +793,51 @@ class ModularBackend(BaseBackend): src_version_id = props[self.SERIAL] hash = props[self.HASH] size = props[self.SIZE] - is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid. - dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy) - return dest_version_id + dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)) + if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name): + self._delete_object(user, src_account, src_container, src_name) + + if delimiter: + prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name + src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False) + src_names.sort(key=lambda x: x[2]) # order by nodes + paths = [elem[0] for elem in src_names] + nodes = [elem[2] for elem in src_names] + # TODO: Will do another fetch of the properties in duplicate version... + props = self._get_versions(nodes) # Check to see if source exists. + + for prop, path, node in zip(props, paths, nodes): + src_version_id = prop[self.SERIAL] + hash = prop[self.HASH] + vtype = prop[self.TYPE] + size = prop[self.SIZE] + dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name + vdest_name = path.replace(prefix, dest_prefix, 1) + dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy)) + if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name): + self._delete_object(user, src_account, src_container, path) + return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids @backend_method - def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None): + def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None): """Copy an object's data and metadata.""" - logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version) - dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False) + logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter) + dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter) return dest_version_id @backend_method - def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None): + def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None): """Move an object's data and metadata.""" - logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions) + logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter) if user != src_account: raise NotAllowedError - dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True) - if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name): - self._delete_object(user, src_account, src_container, src_name) + dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter) return dest_version_id - def _delete_object(self, user, account, container, name, until=None): + def _delete_object(self, user, account, container, name, until=None, delimiter=None): if user != account: raise NotAllowedError @@ -766,29 +861,44 @@ class ModularBackend(BaseBackend): props = self._get_version(node) except NameError: self.permissions.access_clear(path) - self._report_size_change(user, account, -size, {'action': 'object purge'}) + self._report_size_change(user, account, -size, {'action': 'object purge', 'path':path}) return path, node = self._lookup_object(account, container, name) src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED) del_size = self._apply_versioning(account, container, src_version_id) if del_size: - self._report_size_change(user, account, -del_size, {'action': 'object delete'}) + self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path}) self._report_object_change(user, account, path, details={'action': 'object delete'}) self.permissions.access_clear(path) + + if delimiter: + prefix = name + delimiter if not name.endswith(delimiter) else name + src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False) + paths = [] + for t in src_names: + path = '/'.join((account, container, t[0])) + node = t[2] + src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED) + del_size = self._apply_versioning(account, container, src_version_id) + if del_size: + self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path}) + self._report_object_change(user, account, path, details={'action': 'object delete'}) + paths.append(path) + self.permissions.access_clear_bulk(paths) @backend_method - def delete_object(self, user, account, container, name, until=None): + def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None): """Delete/purge an object.""" - logger.debug("delete_object: %s %s %s %s", account, container, name, until) - self._delete_object(user, account, container, name, until) + logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter) + self._delete_object(user, account, container, name, until, delimiter) @backend_method def list_versions(self, user, account, container, name): """Return a list of all (version, version_timestamp) tuples for an object.""" - logger.debug("list_versions: %s %s %s", account, container, name) + logger.debug("list_versions: %s %s %s %s", user, account, container, name) self._can_read(user, account, container, name) path, node = self._lookup_object(account, container, name) versions = self.node.node_get_versions(node) @@ -798,7 +908,7 @@ class ModularBackend(BaseBackend): def get_uuid(self, user, uuid): """Return the (account, container, name) for the UUID given.""" - logger.debug("get_uuid: %s", uuid) + logger.debug("get_uuid: %s %s", user, uuid) info = self.node.latest_uuid(uuid) if info is None: raise NameError @@ -811,7 +921,7 @@ class ModularBackend(BaseBackend): def get_public(self, user, public): """Return the (account, container, name) for the public id given.""" - logger.debug("get_public: %s", public) + logger.debug("get_public: %s %s", user, public) if public is None or public < ULTIMATE_ANSWER: raise NameError path = self.permissions.public_path(public - ULTIMATE_ANSWER) @@ -828,7 +938,7 @@ class ModularBackend(BaseBackend): logger.debug("get_block: %s", hash) block = self.store.block_get(binascii.unhexlify(hash)) if not block: - raise NameError('Block does not exist') + raise ItemNotExists('Block does not exist') return block @backend_method(autocommit=0) @@ -875,16 +985,20 @@ class ModularBackend(BaseBackend): path = '/'.join((account, container)) node = self.node.node_lookup(path) if node is None: - raise NameError('Container does not exist') + raise ItemNotExists('Container does not exist') return path, node def _lookup_object(self, account, container, name): path = '/'.join((account, container, name)) node = self.node.node_lookup(path) if node is None: - raise NameError('Object does not exist') + raise ItemNotExists('Object does not exist') return path, node + def _lookup_objects(self, paths): + nodes = self.node.node_lookup_bulk(paths) + return paths, nodes + def _get_properties(self, node, until=None): """Return properties until the timestamp given.""" @@ -893,7 +1007,7 @@ class ModularBackend(BaseBackend): if props is None and until is not None: props = self.node.version_lookup(node, before, CLUSTER_HISTORY) if props is None: - raise NameError('Path does not exist') + raise ItemNotExists('Path does not exist') return props def _get_statistics(self, node, until=None): @@ -911,16 +1025,19 @@ class ModularBackend(BaseBackend): if version is None: props = self.node.version_lookup(node, inf, CLUSTER_NORMAL) if props is None: - raise NameError('Object does not exist') + raise ItemNotExists('Object does not exist') else: try: version = int(version) except ValueError: - raise IndexError('Version does not exist') + raise VersionNotExists('Version does not exist') props = self.node.version_get_properties(version) if props is None or props[self.CLUSTER] == CLUSTER_DELETED: - raise IndexError('Version does not exist') + raise VersionNotExists('Version does not exist') return props + + def _get_versions(self, nodes): + return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL) def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False): """Create a new version of the node.""" @@ -1000,23 +1117,26 @@ class ModularBackend(BaseBackend): objects.extend([(p, None) for p in prefixes] if virtual else []) objects.sort(key=lambda x: x[0]) objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects] + return objects - start, limit = self._list_limits([x[0] for x in objects], marker, limit) - return objects[start:start + limit] - # Reporting functions. def _report_size_change(self, user, account, size, details={}): - logger.debug("_report_size_change: %s %s %s %s", user, account, size, details) account_node = self._lookup_account(account, True)[1] total = self._get_statistics(account_node)[1] details.update({'user': user, 'total': total}) - self.messages.append((account, 'diskspace', size, details)) + logger.debug("_report_size_change: %s %s %s %s", user, account, size, details) + self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details)) def _report_object_change(self, user, account, path, details={}): + details.update({'user': user}) logger.debug("_report_object_change: %s %s %s %s", user, account, path, details) + self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details)) + + def _report_sharing_change(self, user, account, path, details={}): + logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details) details.update({'user': user}) - self.messages.append((account, 'object', path, details)) + self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details)) # Policy functions.