X-Git-Url: https://code.grnet.gr/git/pithos/blobdiff_plain/ab0982ad6c707d6cc68bb084bae294de97a12549..a8b82467b88fb72117d7d7c283616037563e00c5:/snf-pithos-backend/pithos/backends/modular.py diff --git a/snf-pithos-backend/pithos/backends/modular.py b/snf-pithos-backend/pithos/backends/modular.py index 950c6db..0d7595b 100644 --- a/snf-pithos-backend/pithos/backends/modular.py +++ b/snf-pithos-backend/pithos/backends/modular.py @@ -36,17 +36,52 @@ import os import time import uuid as uuidlib import logging +import hashlib import binascii -from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend +from base import DEFAULT_QUOTA, DEFAULT_VERSIONING, NotAllowedError, QuotaError, BaseBackend, \ + AccountExists, ContainerExists, AccountNotEmpty, ContainerNotEmpty, ItemNotExists, VersionNotExists -from pithos.lib.hashmap import HashMap +# Stripped-down version of the HashMap class found in tools. +class HashMap(list): + + def __init__(self, blocksize, blockhash): + super(HashMap, self).__init__() + self.blocksize = blocksize + self.blockhash = blockhash + + def _hash_raw(self, v): + h = hashlib.new(self.blockhash) + h.update(v) + return h.digest() + + def hash(self): + if len(self) == 0: + return self._hash_raw('') + if len(self) == 1: + return self.__getitem__(0) + + h = list(self) + s = 2 + while s < len(h): + s = s * 2 + h += [('\x00' * len(h[0]))] * (s - len(h)) + while len(h) > 1: + h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)] + return h[0] # Default modules and settings. DEFAULT_DB_MODULE = 'pithos.backends.lib.sqlalchemy' DEFAULT_DB_CONNECTION = 'sqlite:///backend.db' DEFAULT_BLOCK_MODULE = 'pithos.backends.lib.hashfiler' DEFAULT_BLOCK_PATH = 'data/' +DEFAULT_BLOCK_UMASK = 0o022 +#DEFAULT_QUEUE_MODULE = 'pithos.backends.lib.rabbitmq' +#DEFAULT_QUEUE_CONNECTION = 'rabbitmq://guest:guest@localhost:5672/pithos' + +QUEUE_MESSAGE_KEY_PREFIX = 'pithos.%s' +QUEUE_CLIENT_ID = 'pithos' +QUEUE_INSTANCE_ID = '1' ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3) @@ -69,7 +104,10 @@ def backend_method(func=None, autocommit=1): def fn(self, *args, **kw): self.wrapper.execute() try: + self.messages = [] ret = func(self, *args, **kw) + for m in self.messages: + self.queue.send(*m) self.wrapper.commit() return ret except: @@ -84,39 +122,61 @@ class ModularBackend(BaseBackend): Uses modules for SQL functions and storage. """ - def __init__(self, db_module=None, db_connection=None, block_module=None, block_path=None): + def __init__(self, db_module=None, db_connection=None, + block_module=None, block_path=None, block_umask=None, + queue_module=None, queue_connection=None): db_module = db_module or DEFAULT_DB_MODULE db_connection = db_connection or DEFAULT_DB_CONNECTION block_module = block_module or DEFAULT_BLOCK_MODULE block_path = block_path or DEFAULT_BLOCK_PATH + block_umask = block_umask or DEFAULT_BLOCK_UMASK + #queue_module = queue_module or DEFAULT_QUEUE_MODULE + #queue_connection = queue_connection or DEFAULT_QUEUE_CONNECTION self.hash_algorithm = 'sha256' self.block_size = 4 * 1024 * 1024 # 4MB self.default_policy = {'quota': DEFAULT_QUOTA, 'versioning': DEFAULT_VERSIONING} - __import__(db_module) - self.db_module = sys.modules[db_module] - self.wrapper = self.db_module.DBWrapper(db_connection) + def load_module(m): + __import__(m) + return sys.modules[m] + self.db_module = load_module(db_module) + self.wrapper = self.db_module.DBWrapper(db_connection) params = {'wrapper': self.wrapper} self.permissions = self.db_module.Permissions(**params) for x in ['READ', 'WRITE']: setattr(self, x, getattr(self.db_module, x)) self.node = self.db_module.Node(**params) - for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'UUID', 'CLUSTER']: + for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'TYPE', 'MTIME', 'MUSER', 'UUID', 'CHECKSUM', 'CLUSTER', 'MATCH_PREFIX', 'MATCH_EXACT']: setattr(self, x, getattr(self.db_module, x)) - __import__(block_module) - self.block_module = sys.modules[block_module] - + self.block_module = load_module(block_module) params = {'path': block_path, 'block_size': self.block_size, - 'hash_algorithm': self.hash_algorithm} + 'hash_algorithm': self.hash_algorithm, + 'umask': block_umask} self.store = self.block_module.Store(**params) + + if queue_module and queue_connection: + self.queue_module = load_module(queue_module) + params = {'exchange': queue_connection, + 'client_id': QUEUE_CLIENT_ID} + self.queue = self.queue_module.Queue(**params) + else: + class NoQueue: + def send(self, *args): + pass + + def close(self): + pass + + self.queue = NoQueue() def close(self): self.wrapper.close() + self.queue.close() @backend_method def list_accounts(self, user, marker=None, limit=10000): @@ -128,10 +188,10 @@ class ModularBackend(BaseBackend): return allowed[start:start + limit] @backend_method - def get_account_meta(self, user, account, domain, until=None): + def get_account_meta(self, user, account, domain, until=None, include_user_defined=True): """Return a dictionary with the account metadata for the domain.""" - logger.debug("get_account_meta: %s %s %s", account, domain, until) + logger.debug("get_account_meta: %s %s %s %s", user, account, domain, until) path, node = self._lookup_account(account, user == account) if user != account: if until or node is None or account not in self._allowed_accounts(user): @@ -154,7 +214,7 @@ class ModularBackend(BaseBackend): meta = {'name': account} else: meta = {} - if props is not None: + if props is not None and include_user_defined: meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain))) if until is not None: meta.update({'until_timestamp': tstamp}) @@ -166,7 +226,7 @@ class ModularBackend(BaseBackend): def update_account_meta(self, user, account, domain, meta, replace=False): """Update the metadata associated with the account for the domain.""" - logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace) + logger.debug("update_account_meta: %s %s %s %s %s", user, account, domain, meta, replace) if user != account: raise NotAllowedError path, node = self._lookup_account(account, True) @@ -176,7 +236,7 @@ class ModularBackend(BaseBackend): def get_account_groups(self, user, account): """Return a dictionary with the user groups defined for this account.""" - logger.debug("get_account_groups: %s", account) + logger.debug("get_account_groups: %s %s", user, account) if user != account: if account not in self._allowed_accounts(user): raise NotAllowedError @@ -188,7 +248,7 @@ class ModularBackend(BaseBackend): def update_account_groups(self, user, account, groups, replace=False): """Update the groups associated with the account.""" - logger.debug("update_account_groups: %s %s %s", account, groups, replace) + logger.debug("update_account_groups: %s %s %s %s", user, account, groups, replace) if user != account: raise NotAllowedError self._lookup_account(account, True) @@ -205,7 +265,7 @@ class ModularBackend(BaseBackend): def get_account_policy(self, user, account): """Return a dictionary with the account policy.""" - logger.debug("get_account_policy: %s", account) + logger.debug("get_account_policy: %s %s", user, account) if user != account: if account not in self._allowed_accounts(user): raise NotAllowedError @@ -217,7 +277,7 @@ class ModularBackend(BaseBackend): def update_account_policy(self, user, account, policy, replace=False): """Update the policy associated with the account.""" - logger.debug("update_account_policy: %s %s %s", account, policy, replace) + logger.debug("update_account_policy: %s %s %s %s", user, account, policy, replace) if user != account: raise NotAllowedError path, node = self._lookup_account(account, True) @@ -228,12 +288,12 @@ class ModularBackend(BaseBackend): def put_account(self, user, account, policy={}): """Create a new account with the given name.""" - logger.debug("put_account: %s %s", account, policy) + logger.debug("put_account: %s %s %s", user, account, policy) if user != account: raise NotAllowedError node = self.node.node_lookup(account) if node is not None: - raise NameError('Account already exists') + raise AccountExists('Account already exists') if policy: self._check_policy(policy) node = self._put_path(user, self.ROOTNODE, account) @@ -243,40 +303,63 @@ class ModularBackend(BaseBackend): def delete_account(self, user, account): """Delete the account with the given name.""" - logger.debug("delete_account: %s", account) + logger.debug("delete_account: %s %s", user, account) if user != account: raise NotAllowedError node = self.node.node_lookup(account) if node is None: return if not self.node.node_remove(node): - raise IndexError('Account is not empty') + raise AccountNotEmpty('Account is not empty') self.permissions.group_destroy(account) @backend_method - def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None): + def list_containers(self, user, account, marker=None, limit=10000, shared=False, until=None, public=False): """Return a list of containers existing under an account.""" - logger.debug("list_containers: %s %s %s %s %s", account, marker, limit, shared, until) + logger.debug("list_containers: %s %s %s %s %s %s %s", user, account, marker, limit, shared, until, public) if user != account: if until or account not in self._allowed_accounts(user): raise NotAllowedError allowed = self._allowed_containers(user, account) start, limit = self._list_limits(allowed, marker, limit) return allowed[start:start + limit] - if shared: - allowed = [x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)] - allowed = list(set(allowed)) + if shared or public: + allowed = set() + if shared: + allowed.update([x.split('/', 2)[1] for x in self.permissions.access_list_shared(account)]) + if public: + allowed.update([x[0].split('/', 2)[1] for x in self.permissions.public_list(account)]) + allowed = sorted(allowed) start, limit = self._list_limits(allowed, marker, limit) return allowed[start:start + limit] node = self.node.node_lookup(account) - return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)] + containers = [x[0] for x in self._list_object_properties(node, account, '', '/', marker, limit, False, None, [], until)] + start, limit = self._list_limits([x[0] for x in containers], marker, limit) + return containers[start:start + limit] + + @backend_method + def list_container_meta(self, user, account, container, domain, until=None): + """Return a list with all the container's object meta keys for the domain.""" + + logger.debug("list_container_meta: %s %s %s %s %s", user, account, container, domain, until) + allowed = [] + if user != account: + if until: + raise NotAllowedError + allowed = self.permissions.access_list_paths(user, '/'.join((account, container))) + if not allowed: + raise NotAllowedError + path, node = self._lookup_container(account, container) + before = until if until is not None else inf + allowed = self._get_formatted_paths(allowed) + return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed) @backend_method - def get_container_meta(self, user, account, container, domain, until=None): + def get_container_meta(self, user, account, container, domain, until=None, include_user_defined=True): """Return a dictionary with the container metadata for the domain.""" - logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until) + logger.debug("get_container_meta: %s %s %s %s %s", user, account, container, domain, until) if user != account: if until or container not in self._allowed_containers(user, account): raise NotAllowedError @@ -294,7 +377,9 @@ class ModularBackend(BaseBackend): if user != account: meta = {'name': container} else: - meta = dict(self.node.attribute_get(props[self.SERIAL], domain)) + meta = {} + if include_user_defined: + meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain))) if until is not None: meta.update({'until_timestamp': tstamp}) meta.update({'name': container, 'count': count, 'bytes': bytes}) @@ -305,17 +390,21 @@ class ModularBackend(BaseBackend): def update_container_meta(self, user, account, container, domain, meta, replace=False): """Update the metadata associated with the container for the domain.""" - logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace) + logger.debug("update_container_meta: %s %s %s %s %s %s", user, account, container, domain, meta, replace) if user != account: raise NotAllowedError path, node = self._lookup_container(account, container) - self._put_metadata(user, node, domain, meta, replace) + src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace) + if src_version_id is not None: + versioning = self._get_policy(node)['versioning'] + if versioning != 'auto': + self.node.version_remove(src_version_id) @backend_method def get_container_policy(self, user, account, container): """Return a dictionary with the container policy.""" - logger.debug("get_container_policy: %s %s", account, container) + logger.debug("get_container_policy: %s %s %s", user, account, container) if user != account: if container not in self._allowed_containers(user, account): raise NotAllowedError @@ -327,7 +416,7 @@ class ModularBackend(BaseBackend): def update_container_policy(self, user, account, container, policy, replace=False): """Update the policy associated with the container.""" - logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace) + logger.debug("update_container_policy: %s %s %s %s %s", user, account, container, policy, replace) if user != account: raise NotAllowedError path, node = self._lookup_container(account, container) @@ -338,7 +427,7 @@ class ModularBackend(BaseBackend): def put_container(self, user, account, container, policy={}): """Create a new container with the given name.""" - logger.debug("put_container: %s %s %s", account, container, policy) + logger.debug("put_container: %s %s %s %s", user, account, container, policy) if user != account: raise NotAllowedError try: @@ -346,7 +435,7 @@ class ModularBackend(BaseBackend): except NameError: pass else: - raise NameError('Container already exists') + raise ContainerExists('Container already exists') if policy: self._check_policy(policy) path = '/'.join((account, container)) @@ -354,70 +443,170 @@ class ModularBackend(BaseBackend): self._put_policy(node, policy, True) @backend_method - def delete_container(self, user, account, container, until=None): + def delete_container(self, user, account, container, until=None, prefix='', delimiter=None): """Delete/purge the container with the given name.""" - logger.debug("delete_container: %s %s %s", account, container, until) + logger.debug("delete_container: %s %s %s %s %s %s", user, account, container, until, prefix, delimiter) if user != account: raise NotAllowedError path, node = self._lookup_container(account, container) if until is not None: - hashes = self.node.node_purge_children(node, until, CLUSTER_HISTORY) + hashes, size = self.node.node_purge_children(node, until, CLUSTER_HISTORY) for h in hashes: self.store.map_delete(h) self.node.node_purge_children(node, until, CLUSTER_DELETED) + self._report_size_change(user, account, -size, {'action': 'container purge', 'path':path}) return - if self._get_statistics(node)[0] > 0: - raise IndexError('Container is not empty') - hashes = self.node.node_purge_children(node, inf, CLUSTER_HISTORY) - for h in hashes: - self.store.map_delete(h) - self.node.node_purge_children(node, inf, CLUSTER_DELETED) - self.node.node_remove(node) + if not delimiter: + if self._get_statistics(node)[0] > 0: + raise ContainerNotEmpty('Container is not empty') + hashes, size = self.node.node_purge_children(node, inf, CLUSTER_HISTORY) + for h in hashes: + self.store.map_delete(h) + self.node.node_purge_children(node, inf, CLUSTER_DELETED) + self.node.node_remove(node) + self._report_size_change(user, account, -size, {'action': 'container delete', 'path':path}) + else: + # remove only contents + src_names = self._list_objects_no_limit(user, account, container, prefix='', delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False) + paths = [] + for t in src_names: + path = '/'.join((account, container, t[0])) + node = t[2] + src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED) + del_size = self._apply_versioning(account, container, src_version_id) + if del_size: + self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path}) + self._report_object_change(user, account, path, details={'action': 'object delete'}) + paths.append(path) + self.permissions.access_clear_bulk(paths) + + def _list_objects(self, user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public): + if user != account and until: + raise NotAllowedError + if shared and public: + # get shared first + shared = self._list_object_permissions(user, account, container, prefix, shared=True, public=False) + objects = set() + if shared: + path, node = self._lookup_container(account, container) + shared = self._get_formatted_paths(shared) + objects |= set(self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, shared, all_props)) + + # get public + objects |= set(self._list_public_object_properties(user, account, container, prefix, all_props)) + objects = list(objects) + + objects.sort(key=lambda x: x[0]) + start, limit = self._list_limits([x[0] for x in objects], marker, limit) + return objects[start:start + limit] + elif public: + objects = self._list_public_object_properties(user, account, container, prefix, all_props) + start, limit = self._list_limits([x[0] for x in objects], marker, limit) + return objects[start:start + limit] + + allowed = self._list_object_permissions(user, account, container, prefix, shared, public) + if shared and not allowed: + return [] + path, node = self._lookup_container(account, container) + allowed = self._get_formatted_paths(allowed) + objects = self._list_object_properties(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed, all_props) + start, limit = self._list_limits([x[0] for x in objects], marker, limit) + return objects[start:start + limit] - @backend_method - def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None): - """Return a list of objects existing under a container.""" + def _list_public_object_properties(self, user, account, container, prefix, all_props): + public = self._list_object_permissions(user, account, container, prefix, shared=False, public=True) + paths, nodes = self._lookup_objects(public) + path = '/'.join((account, container)) + cont_prefix = path + '/' + paths = [x[len(cont_prefix):] for x in paths] + props = self.node.version_lookup_bulk(nodes, all_props=all_props) + objects = [(path,) + props for path, props in zip(paths, props)] + return objects - logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until) + def _list_objects_no_limit(self, user, account, container, prefix, delimiter, virtual, domain, keys, shared, until, size_range, all_props, public): + objects = [] + while True: + marker = objects[-1] if objects else None + limit = 10000 + l = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, all_props, public) + objects.extend(l) + if not l or len(l) < limit: + break + return objects + + def _list_object_permissions(self, user, account, container, prefix, shared, public): allowed = [] + path = '/'.join((account, container, prefix)).rstrip('/') if user != account: - if until: - raise NotAllowedError - allowed = self.permissions.access_list_paths(user, '/'.join((account, container))) + allowed = self.permissions.access_list_paths(user, path) if not allowed: raise NotAllowedError else: + allowed = set() if shared: - allowed = self.permissions.access_list_shared('/'.join((account, container))) - if not allowed: - return [] - path, node = self._lookup_container(account, container) - return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, size_range, allowed) + allowed.update(self.permissions.access_list_shared(path)) + if public: + allowed.update([x[0] for x in self.permissions.public_list(path)]) + allowed = sorted(allowed) + if not allowed: + return [] + return allowed @backend_method - def list_object_meta(self, user, account, container, domain, until=None): - """Return a list with all the container's object meta keys for the domain.""" + def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False): + """Return a list of object (name, version_id) tuples existing under a container.""" - logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until) - allowed = [] - if user != account: - if until: - raise NotAllowedError - allowed = self.permissions.access_list_paths(user, '/'.join((account, container))) - if not allowed: - raise NotAllowedError - path, node = self._lookup_container(account, container) - before = until if until is not None else inf - return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed) + logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public) + return self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, False, public) + + @backend_method + def list_object_meta(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None, size_range=None, public=False): + """Return a list of object metadata dicts existing under a container.""" + + logger.debug("list_object_meta: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, public) + props = self._list_objects(user, account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until, size_range, True, public) + objects = [] + for p in props: + if len(p) == 2: + objects.append({'subdir': p[0]}) + else: + objects.append({'name': p[0], + 'bytes': p[self.SIZE + 1], + 'type': p[self.TYPE + 1], + 'hash': p[self.HASH + 1], + 'version': p[self.SERIAL + 1], + 'version_timestamp': p[self.MTIME + 1], + 'modified': p[self.MTIME + 1] if until is None else None, + 'modified_by': p[self.MUSER + 1], + 'uuid': p[self.UUID + 1], + 'checksum': p[self.CHECKSUM + 1]}) + return objects + + @backend_method + def list_object_permissions(self, user, account, container, prefix=''): + """Return a list of paths that enforce permissions under a container.""" + + logger.debug("list_object_permissions: %s %s %s %s", user, account, container, prefix) + return self._list_object_permissions(user, account, container, prefix, True, False) + + @backend_method + def list_object_public(self, user, account, container, prefix=''): + """Return a dict mapping paths to public ids for objects that are public under a container.""" + + logger.debug("list_object_public: %s %s %s %s", user, account, container, prefix) + public = {} + for path, p in self.permissions.public_list('/'.join((account, container, prefix))): + public[path] = p + ULTIMATE_ANSWER + return public @backend_method - def get_object_meta(self, user, account, container, name, domain, version=None): + def get_object_meta(self, user, account, container, name, domain, version=None, include_user_defined=True): """Return a dictionary with the object metadata for the domain.""" - logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version) + logger.debug("get_object_meta: %s %s %s %s %s %s", user, account, container, name, domain, version) self._can_read(user, account, container, name) path, node = self._lookup_object(account, container, name) props = self._get_version(node, version) @@ -429,20 +618,29 @@ class ModularBackend(BaseBackend): except NameError: # Object may be deleted. del_props = self.node.version_lookup(node, inf, CLUSTER_DELETED) if del_props is None: - raise NameError('Object does not exist') + raise ItemNotExists('Object does not exist') modified = del_props[self.MTIME] - meta = dict(self.node.attribute_get(props[self.SERIAL], domain)) - meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]}) - meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]}) - meta.update({'modified': modified, 'modified_by': props[self.MUSER], 'uuid': props[self.UUID]}) + meta = {} + if include_user_defined: + meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain))) + meta.update({'name': name, + 'bytes': props[self.SIZE], + 'type': props[self.TYPE], + 'hash': props[self.HASH], + 'version': props[self.SERIAL], + 'version_timestamp': props[self.MTIME], + 'modified': modified, + 'modified_by': props[self.MUSER], + 'uuid': props[self.UUID], + 'checksum': props[self.CHECKSUM]}) return meta @backend_method def update_object_meta(self, user, account, container, name, domain, meta, replace=False): """Update the metadata associated with the object for the domain and return the new version.""" - logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace) + logger.debug("update_object_meta: %s %s %s %s %s %s %s", user, account, container, name, domain, meta, replace) self._can_write(user, account, container, name) path, node = self._lookup_object(account, container, name) src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace) @@ -455,35 +653,36 @@ class ModularBackend(BaseBackend): from which the object gets its permissions from, along with a dictionary containing the permissions.""" - logger.debug("get_object_permissions: %s %s %s", account, container, name) + logger.debug("get_object_permissions: %s %s %s %s", user, account, container, name) allowed = 'write' + permissions_path = self._get_permissions_path(account, container, name) if user != account: - path = '/'.join((account, container, name)) - if self.permissions.access_check(path, self.WRITE, user): + if self.permissions.access_check(permissions_path, self.WRITE, user): allowed = 'write' - elif self.permissions.access_check(path, self.READ, user): + elif self.permissions.access_check(permissions_path, self.READ, user): allowed = 'read' else: raise NotAllowedError - path = self._lookup_object(account, container, name)[0] - return (allowed,) + self.permissions.access_inherit(path) + self._lookup_object(account, container, name) + return (allowed, permissions_path, self.permissions.access_get(permissions_path)) @backend_method def update_object_permissions(self, user, account, container, name, permissions): """Update the permissions associated with the object.""" - logger.debug("update_object_permissions: %s %s %s %s", account, container, name, permissions) + logger.debug("update_object_permissions: %s %s %s %s %s", user, account, container, name, permissions) if user != account: raise NotAllowedError path = self._lookup_object(account, container, name)[0] self._check_permissions(path, permissions) self.permissions.access_set(path, permissions) + self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)}) @backend_method def get_object_public(self, user, account, container, name): """Return the public id of the object if applicable.""" - logger.debug("get_object_public: %s %s %s", account, container, name) + logger.debug("get_object_public: %s %s %s %s", user, account, container, name) self._can_read(user, account, container, name) path = self._lookup_object(account, container, name)[0] p = self.permissions.public_get(path) @@ -495,7 +694,7 @@ class ModularBackend(BaseBackend): def update_object_public(self, user, account, container, name, public): """Update the public status of the object.""" - logger.debug("update_object_public: %s %s %s %s", account, container, name, public) + logger.debug("update_object_public: %s %s %s %s %s", user, account, container, name, public) self._can_write(user, account, container, name) path = self._lookup_object(account, container, name)[0] if not public: @@ -507,14 +706,14 @@ class ModularBackend(BaseBackend): def get_object_hashmap(self, user, account, container, name, version=None): """Return the object's size and a list with partial hashes.""" - logger.debug("get_object_hashmap: %s %s %s %s", account, container, name, version) + logger.debug("get_object_hashmap: %s %s %s %s %s", user, account, container, name, version) self._can_read(user, account, container, name) path, node = self._lookup_object(account, container, name) props = self._get_version(node, version) hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH])) return props[self.SIZE], [binascii.hexlify(x) for x in hashmap] - def _update_object_hash(self, user, account, container, name, size, hash, permissions, src_node=None, is_copy=False): + def _update_object_hash(self, user, account, container, name, size, type, hash, checksum, domain, meta, replace_meta, permissions, src_node=None, src_version_id=None, is_copy=False): if permissions is not None and user != account: raise NotAllowedError self._can_write(user, account, container, name) @@ -525,14 +724,16 @@ class ModularBackend(BaseBackend): account_path, account_node = self._lookup_account(account, True) container_path, container_node = self._lookup_container(account, container) path, node = self._put_object_node(container_path, container_node, name) - pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, hash=hash, is_copy=is_copy) + pre_version_id, dest_version_id = self._put_version_duplicate(user, node, src_node=src_node, size=size, type=type, hash=hash, checksum=checksum, is_copy=is_copy) + + # Handle meta. + if src_version_id is None: + src_version_id = pre_version_id + self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta) # Check quota. - versioning = self._get_policy(container_node)['versioning'] - if versioning != 'auto': - size_delta = size - 0 # TODO: Get previous size. - else: - size_delta = size + del_size = self._apply_versioning(account, container, pre_version_id) + size_delta = size - del_size if size_delta > 0: account_quota = long(self._get_policy(account_node)['quota']) container_quota = long(self._get_policy(container_node)['quota']) @@ -540,17 +741,20 @@ class ModularBackend(BaseBackend): (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota): # This must be executed in a transaction, so the version is never created if it fails. raise QuotaError + self._report_size_change(user, account, size_delta, {'action': 'object update', 'path':path}) if permissions is not None: self.permissions.access_set(path, permissions) - self._apply_versioning(account, container, pre_version_id) - return pre_version_id, dest_version_id + self._report_sharing_change(user, account, path, {'members':self.permissions.access_members(path)}) + + self._report_object_change(user, account, path, details={'version': dest_version_id, 'action': 'object update'}) + return dest_version_id @backend_method - def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None): + def update_object_hashmap(self, user, account, container, name, size, type, hashmap, checksum, domain, meta={}, replace_meta=False, permissions=None): """Create/update an object with the specified size and partial hashes.""" - logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap) + logger.debug("update_object_hashmap: %s %s %s %s %s %s %s %s", user, account, container, name, size, type, hashmap, checksum) if size == 0: # No such thing as an empty hashmap. hashmap = [self.put_block('')] map = HashMap(self.block_size, self.hash_algorithm) @@ -562,12 +766,26 @@ class ModularBackend(BaseBackend): raise ie hash = map.hash() - pre_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions) - self._put_metadata_duplicate(pre_version_id, dest_version_id, domain, meta, replace_meta) + dest_version_id = self._update_object_hash(user, account, container, name, size, type, binascii.hexlify(hash), checksum, domain, meta, replace_meta, permissions) self.store.map_put(hash, map) return dest_version_id - def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False): + @backend_method + def update_object_checksum(self, user, account, container, name, version, checksum): + """Update an object's checksum.""" + + logger.debug("update_object_checksum: %s %s %s %s %s %s", user, account, container, name, version, checksum) + # Update objects with greater version and same hashmap and size (fix metadata updates). + self._can_write(user, account, container, name) + path, node = self._lookup_object(account, container, name) + props = self._get_version(node, version) + versions = self.node.node_get_versions(node) + for x in versions: + if x[self.SERIAL] >= int(version) and x[self.HASH] == props[self.HASH] and x[self.SIZE] == props[self.SIZE]: + self.node.version_put_property(x[self.SERIAL], 'checksum', checksum) + + def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None, is_move=False, delimiter=None): + dest_version_ids = [] self._can_read(user, src_account, src_container, src_name) path, node = self._lookup_object(src_account, src_container, src_name) # TODO: Will do another fetch of the properties in duplicate version... @@ -575,32 +793,51 @@ class ModularBackend(BaseBackend): src_version_id = props[self.SERIAL] hash = props[self.HASH] size = props[self.SIZE] - is_copy = not is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name) # New uuid. - pre_version_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions, src_node=node, is_copy=is_copy) - self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta) - return dest_version_id + dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, dest_name, size, type, hash, None, dest_domain, dest_meta, replace_meta, permissions, src_node=node, src_version_id=src_version_id, is_copy=is_copy)) + if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name): + self._delete_object(user, src_account, src_container, src_name) + + if delimiter: + prefix = src_name + delimiter if not src_name.endswith(delimiter) else src_name + src_names = self._list_objects_no_limit(user, src_account, src_container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False) + src_names.sort(key=lambda x: x[2]) # order by nodes + paths = [elem[0] for elem in src_names] + nodes = [elem[2] for elem in src_names] + # TODO: Will do another fetch of the properties in duplicate version... + props = self._get_versions(nodes) # Check to see if source exists. + + for prop, path, node in zip(props, paths, nodes): + src_version_id = prop[self.SERIAL] + hash = prop[self.HASH] + vtype = prop[self.TYPE] + size = prop[self.SIZE] + dest_prefix = dest_name + delimiter if not dest_name.endswith(delimiter) else dest_name + vdest_name = path.replace(prefix, dest_prefix, 1) + dest_version_ids.append(self._update_object_hash(user, dest_account, dest_container, vdest_name, size, vtype, hash, None, dest_domain, meta={}, replace_meta=False, permissions=None, src_node=node, src_version_id=src_version_id, is_copy=is_copy)) + if is_move and (src_account, src_container, src_name) != (dest_account, dest_container, dest_name): + self._delete_object(user, src_account, src_container, path) + return dest_version_ids[0] if len(dest_version_ids) == 1 else dest_version_ids @backend_method - def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None): + def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, src_version=None, delimiter=None): """Copy an object's data and metadata.""" - logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version) - return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version, False) + logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, delimiter) + dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, src_version, False, delimiter) + return dest_version_id @backend_method - def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None): + def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta={}, replace_meta=False, permissions=None, delimiter=None): """Move an object's data and metadata.""" - logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions) + logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s %s %s %s", user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, delimiter) if user != src_account: raise NotAllowedError - dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None, True) - if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name): - self._delete_object(user, src_account, src_container, src_name) + dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, type, domain, meta, replace_meta, permissions, None, True, delimiter) return dest_version_id - def _delete_object(self, user, account, container, name, until=None): + def _delete_object(self, user, account, container, name, until=None, delimiter=None): if user != account: raise NotAllowedError @@ -609,8 +846,14 @@ class ModularBackend(BaseBackend): node = self.node.node_lookup(path) if node is None: return - hashes = self.node.node_purge(node, until, CLUSTER_NORMAL) - hashes += self.node.node_purge(node, until, CLUSTER_HISTORY) + hashes = [] + size = 0 + h, s = self.node.node_purge(node, until, CLUSTER_NORMAL) + hashes += h + size += s + h, s = self.node.node_purge(node, until, CLUSTER_HISTORY) + hashes += h + size += s for h in hashes: self.store.map_delete(h) self.node.node_purge(node, until, CLUSTER_DELETED) @@ -618,25 +861,44 @@ class ModularBackend(BaseBackend): props = self._get_version(node) except NameError: self.permissions.access_clear(path) + self._report_size_change(user, account, -size, {'action': 'object purge', 'path':path}) return path, node = self._lookup_object(account, container, name) - src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, hash=None, cluster=CLUSTER_DELETED) - self._apply_versioning(account, container, src_version_id) + src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED) + del_size = self._apply_versioning(account, container, src_version_id) + if del_size: + self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path}) + self._report_object_change(user, account, path, details={'action': 'object delete'}) self.permissions.access_clear(path) + + if delimiter: + prefix = name + delimiter if not name.endswith(delimiter) else name + src_names = self._list_objects_no_limit(user, account, container, prefix, delimiter=None, virtual=False, domain=None, keys=[], shared=False, until=None, size_range=None, all_props=True, public=False) + paths = [] + for t in src_names: + path = '/'.join((account, container, t[0])) + node = t[2] + src_version_id, dest_version_id = self._put_version_duplicate(user, node, size=0, type='', hash=None, checksum='', cluster=CLUSTER_DELETED) + del_size = self._apply_versioning(account, container, src_version_id) + if del_size: + self._report_size_change(user, account, -del_size, {'action': 'object delete', 'path':path}) + self._report_object_change(user, account, path, details={'action': 'object delete'}) + paths.append(path) + self.permissions.access_clear_bulk(paths) @backend_method - def delete_object(self, user, account, container, name, until=None): + def delete_object(self, user, account, container, name, until=None, prefix='', delimiter=None): """Delete/purge an object.""" - logger.debug("delete_object: %s %s %s %s", account, container, name, until) - self._delete_object(user, account, container, name, until) + logger.debug("delete_object: %s %s %s %s %s %s %s", user, account, container, name, until, prefix, delimiter) + self._delete_object(user, account, container, name, until, delimiter) @backend_method def list_versions(self, user, account, container, name): """Return a list of all (version, version_timestamp) tuples for an object.""" - logger.debug("list_versions: %s %s %s", account, container, name) + logger.debug("list_versions: %s %s %s %s", user, account, container, name) self._can_read(user, account, container, name) path, node = self._lookup_object(account, container, name) versions = self.node.node_get_versions(node) @@ -646,7 +908,7 @@ class ModularBackend(BaseBackend): def get_uuid(self, user, uuid): """Return the (account, container, name) for the UUID given.""" - logger.debug("get_uuid: %s", uuid) + logger.debug("get_uuid: %s %s", user, uuid) info = self.node.latest_uuid(uuid) if info is None: raise NameError @@ -659,7 +921,7 @@ class ModularBackend(BaseBackend): def get_public(self, user, public): """Return the (account, container, name) for the public id given.""" - logger.debug("get_public: %s", public) + logger.debug("get_public: %s %s", user, public) if public is None or public < ULTIMATE_ANSWER: raise NameError path = self.permissions.public_path(public - ULTIMATE_ANSWER) @@ -676,7 +938,7 @@ class ModularBackend(BaseBackend): logger.debug("get_block: %s", hash) block = self.store.block_get(binascii.unhexlify(hash)) if not block: - raise NameError('Block does not exist') + raise ItemNotExists('Block does not exist') return block @backend_method(autocommit=0) @@ -710,7 +972,7 @@ class ModularBackend(BaseBackend): def _put_path(self, user, parent, path): node = self.node.node_create(parent, path) - self.node.version_create(node, None, 0, None, user, self._generate_uuid(), CLUSTER_NORMAL) + self.node.version_create(node, None, 0, '', None, user, self._generate_uuid(), '', CLUSTER_NORMAL) return node def _lookup_account(self, account, create=True): @@ -723,16 +985,20 @@ class ModularBackend(BaseBackend): path = '/'.join((account, container)) node = self.node.node_lookup(path) if node is None: - raise NameError('Container does not exist') + raise ItemNotExists('Container does not exist') return path, node def _lookup_object(self, account, container, name): path = '/'.join((account, container, name)) node = self.node.node_lookup(path) if node is None: - raise NameError('Object does not exist') + raise ItemNotExists('Object does not exist') return path, node + def _lookup_objects(self, paths): + nodes = self.node.node_lookup_bulk(paths) + return paths, nodes + def _get_properties(self, node, until=None): """Return properties until the timestamp given.""" @@ -741,7 +1007,7 @@ class ModularBackend(BaseBackend): if props is None and until is not None: props = self.node.version_lookup(node, before, CLUSTER_HISTORY) if props is None: - raise NameError('Path does not exist') + raise ItemNotExists('Path does not exist') return props def _get_statistics(self, node, until=None): @@ -759,18 +1025,21 @@ class ModularBackend(BaseBackend): if version is None: props = self.node.version_lookup(node, inf, CLUSTER_NORMAL) if props is None: - raise NameError('Object does not exist') + raise ItemNotExists('Object does not exist') else: try: version = int(version) except ValueError: - raise IndexError('Version does not exist') + raise VersionNotExists('Version does not exist') props = self.node.version_get_properties(version) if props is None or props[self.CLUSTER] == CLUSTER_DELETED: - raise IndexError('Version does not exist') + raise VersionNotExists('Version does not exist') return props + + def _get_versions(self, nodes): + return self.node.version_lookup_bulk(nodes, inf, CLUSTER_NORMAL) - def _put_version_duplicate(self, user, node, src_node=None, size=None, hash=None, cluster=CLUSTER_NORMAL, is_copy=False): + def _put_version_duplicate(self, user, node, src_node=None, size=None, type=None, hash=None, checksum=None, cluster=CLUSTER_NORMAL, is_copy=False): """Create a new version of the node.""" props = self.node.version_lookup(node if src_node is None else src_node, inf, CLUSTER_NORMAL) @@ -778,13 +1047,21 @@ class ModularBackend(BaseBackend): src_version_id = props[self.SERIAL] src_hash = props[self.HASH] src_size = props[self.SIZE] + src_type = props[self.TYPE] + src_checksum = props[self.CHECKSUM] else: src_version_id = None src_hash = None src_size = 0 - if size is None: - hash = src_hash # This way hash can be set to None. + src_type = '' + src_checksum = '' + if size is None: # Set metadata. + hash = src_hash # This way hash can be set to None (account or container). size = src_size + if type is None: + type = src_type + if checksum is None: + checksum = src_checksum uuid = self._generate_uuid() if (is_copy or src_version_id is None) else props[self.UUID] if src_node is None: @@ -797,7 +1074,7 @@ class ModularBackend(BaseBackend): if pre_version_id is not None: self.node.version_recluster(pre_version_id, CLUSTER_HISTORY) - dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, uuid, cluster) + dest_version_id, mtime = self.node.version_create(node, hash, size, type, src_version_id, user, uuid, checksum, cluster) return pre_version_id, dest_version_id def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False): @@ -828,7 +1105,7 @@ class ModularBackend(BaseBackend): limit = 10000 return start, limit - def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[]): + def _list_object_properties(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, size_range=None, allowed=[], all_props=False): cont_prefix = path + '/' prefix = cont_prefix + prefix start = cont_prefix + marker if marker else None @@ -836,13 +1113,30 @@ class ModularBackend(BaseBackend): filterq = keys if domain else [] sizeq = size_range - objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq) + objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq, sizeq, all_props) objects.extend([(p, None) for p in prefixes] if virtual else []) objects.sort(key=lambda x: x[0]) - objects = [(x[0][len(cont_prefix):], x[1]) for x in objects] + objects = [(x[0][len(cont_prefix):],) + x[1:] for x in objects] + return objects - start, limit = self._list_limits([x[0] for x in objects], marker, limit) - return objects[start:start + limit] + # Reporting functions. + + def _report_size_change(self, user, account, size, details={}): + account_node = self._lookup_account(account, True)[1] + total = self._get_statistics(account_node)[1] + details.update({'user': user, 'total': total}) + logger.debug("_report_size_change: %s %s %s %s", user, account, size, details) + self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('resource.diskspace',), account, QUEUE_INSTANCE_ID, 'diskspace', float(size), details)) + + def _report_object_change(self, user, account, path, details={}): + details.update({'user': user}) + logger.debug("_report_object_change: %s %s %s %s", user, account, path, details) + self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('object',), account, QUEUE_INSTANCE_ID, 'object', path, details)) + + def _report_sharing_change(self, user, account, path, details={}): + logger.debug("_report_permissions_change: %s %s %s %s", user, account, path, details) + details.update({'user': user}) + self.messages.append((QUEUE_MESSAGE_KEY_PREFIX % ('sharing',), account, QUEUE_INSTANCE_ID, 'sharing', path, details)) # Policy functions. @@ -874,13 +1168,19 @@ class ModularBackend(BaseBackend): return policy def _apply_versioning(self, account, container, version_id): + """Delete the provided version if such is the policy. + Return size of object removed. + """ + if version_id is None: - return + return 0 path, node = self._lookup_container(account, container) versioning = self._get_policy(node)['versioning'] if versioning != 'auto': - hash = self.node.version_remove(version_id) + hash, size = self.node.version_remove(version_id) self.store.map_delete(hash) + return size + return 0 # Access control functions. @@ -890,18 +1190,49 @@ class ModularBackend(BaseBackend): def _check_permissions(self, path, permissions): # raise ValueError('Bad characters in permissions') - - # Check for existing permissions. - paths = self.permissions.access_list(path) - if paths: - ae = AttributeError() - ae.data = paths - raise ae + pass + + def _get_formatted_paths(self, paths): + formatted = [] + for p in paths: + node = self.node.node_lookup(p) + props = None + if node is not None: + props = self.node.version_lookup(node, inf, CLUSTER_NORMAL) + if props is not None: + if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'): + formatted.append((p.rstrip('/') + '/', self.MATCH_PREFIX)) + formatted.append((p, self.MATCH_EXACT)) + return formatted + + def _get_permissions_path(self, account, container, name): + path = '/'.join((account, container, name)) + permission_paths = self.permissions.access_inherit(path) + permission_paths.sort() + permission_paths.reverse() + for p in permission_paths: + if p == path: + return p + else: + if p.count('/') < 2: + continue + node = self.node.node_lookup(p) + if node is not None: + props = self.node.version_lookup(node, inf, CLUSTER_NORMAL) + if props is not None: + if props[self.TYPE].split(';', 1)[0].strip() in ('application/directory', 'application/folder'): + return p + return None def _can_read(self, user, account, container, name): if user == account: return True path = '/'.join((account, container, name)) + if self.permissions.public_get(path) is not None: + return True + path = self._get_permissions_path(account, container, name) + if not path: + raise NotAllowedError if not self.permissions.access_check(path, self.READ, user) and not self.permissions.access_check(path, self.WRITE, user): raise NotAllowedError @@ -909,6 +1240,9 @@ class ModularBackend(BaseBackend): if user == account: return True path = '/'.join((account, container, name)) + path = self._get_permissions_path(account, container, name) + if not path: + raise NotAllowedError if not self.permissions.access_check(path, self.WRITE, user): raise NotAllowedError