X-Git-Url: https://code.grnet.gr/git/pithos/blobdiff_plain/1ad56ff3aad5ff4fdfb1968b753b50d0c718256f..d14fe290caac1683b57e122d3a0af1064052a425:/pithos/backends/modular.py diff --git a/pithos/backends/modular.py b/pithos/backends/modular.py index 726dcf0..62ecdf6 100644 --- a/pithos/backends/modular.py +++ b/pithos/backends/modular.py @@ -34,12 +34,11 @@ import sys import os import time -import sqlite3 import logging import hashlib import binascii -from base import NotAllowedError, BaseBackend +from base import NotAllowedError, QuotaError, BaseBackend from lib.hashfiler import Mapper, Blocker ( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3) @@ -49,6 +48,35 @@ inf = float('inf') logger = logging.getLogger(__name__) + +class HashMap(list): + + def __init__(self, blocksize, blockhash): + super(HashMap, self).__init__() + self.blocksize = blocksize + self.blockhash = blockhash + + def _hash_raw(self, v): + h = hashlib.new(self.blockhash) + h.update(v) + return h.digest() + + def hash(self): + if len(self) == 0: + return self._hash_raw('') + if len(self) == 1: + return self.__getitem__(0) + + h = list(self) + s = 2 + while s < len(h): + s = s * 2 + h += [('\x00' * len(h[0]))] * (s - len(h)) + while len(h) > 1: + h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)] + return h[0] + + def backend_method(func=None, autocommit=1): if func is None: def fn(func): @@ -79,7 +107,7 @@ class ModularBackend(BaseBackend): self.hash_algorithm = 'sha256' self.block_size = 4 * 1024 * 1024 # 4MB - self.default_policy = {'quota': 0, 'versioning': 'auto'} + self.default_policy = {'quota': 0, 'versioning': 'manual'} if path and not os.path.exists(path): os.makedirs(path) @@ -88,6 +116,7 @@ class ModularBackend(BaseBackend): __import__(mod) self.mod = sys.modules[mod] + self.db = db self.wrapper = self.mod.dbwrapper.DBWrapper(db) params = {'blocksize': self.block_size, @@ -103,11 +132,13 @@ class ModularBackend(BaseBackend): self.permissions = self.mod.permissions.Permissions(**params) for x in ['READ', 'WRITE']: setattr(self, x, getattr(self.mod.permissions, x)) - self.policy = self.mod.policy.Policy(**params) self.node = self.mod.node.Node(**params) - for x in ['ROOTNODE', 'SERIAL', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']: + for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']: setattr(self, x, getattr(self.mod.node, x)) + def close(self): + self.wrapper.close() + @backend_method def list_accounts(self, user, marker=None, limit=10000): """Return a list of accounts the user can access.""" @@ -160,7 +191,7 @@ class ModularBackend(BaseBackend): if user != account: raise NotAllowedError path, node = self._lookup_account(account, True) - self._put_metadata(user, node, meta, replace, False) + self._put_metadata(user, node, meta, replace) @backend_method def get_account_groups(self, user, account): @@ -192,16 +223,42 @@ class ModularBackend(BaseBackend): self.permissions.group_addmany(account, k, v) @backend_method - def put_account(self, user, account): + def get_account_policy(self, user, account): + """Return a dictionary with the account policy.""" + + logger.debug("get_account_policy: %s", account) + if user != account: + if account not in self._allowed_accounts(user): + raise NotAllowedError + return {} + path, node = self._lookup_account(account, True) + return self._get_policy(node) + + @backend_method + def update_account_policy(self, user, account, policy, replace=False): + """Update the policy associated with the account.""" + + logger.debug("update_account_policy: %s %s %s", account, policy, replace) + if user != account: + raise NotAllowedError + path, node = self._lookup_account(account, True) + self._check_policy(policy) + self._put_policy(node, policy, replace) + + @backend_method + def put_account(self, user, account, policy={}): """Create a new account with the given name.""" - logger.debug("put_account: %s", account) + logger.debug("put_account: %s %s", account, policy) if user != account: raise NotAllowedError node = self.node.node_lookup(account) if node is not None: raise NameError('Account already exists') - self._put_path(user, self.ROOTNODE, account) + if policy: + self._check_policy(policy) + node = self._put_path(user, self.ROOTNODE, account) + self._put_policy(node, policy, True) @backend_method def delete_account(self, user, account): @@ -273,7 +330,7 @@ class ModularBackend(BaseBackend): if user != account: raise NotAllowedError path, node = self._lookup_container(account, container) - self._put_metadata(user, node, meta, replace, False) + self._put_metadata(user, node, meta, replace) @backend_method def get_container_policy(self, user, account, container): @@ -284,26 +341,22 @@ class ModularBackend(BaseBackend): if container not in self._allowed_containers(user, account): raise NotAllowedError return {} - path = self._lookup_container(account, container)[0] - return self.policy.policy_get(path) + path, node = self._lookup_container(account, container) + return self._get_policy(node) @backend_method def update_container_policy(self, user, account, container, policy, replace=False): - """Update the policy associated with the account.""" + """Update the policy associated with the container.""" logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace) if user != account: raise NotAllowedError - path = self._lookup_container(account, container)[0] + path, node = self._lookup_container(account, container) self._check_policy(policy) - if replace: - for k, v in self.default_policy.iteritems(): - if k not in policy: - policy[k] = v - self.policy.policy_set(path, policy) + self._put_policy(node, policy, replace) @backend_method - def put_container(self, user, account, container, policy=None): + def put_container(self, user, account, container, policy={}): """Create a new container with the given name.""" logger.debug("put_container: %s %s %s", account, container, policy) @@ -318,11 +371,8 @@ class ModularBackend(BaseBackend): if policy: self._check_policy(policy) path = '/'.join((account, container)) - self._put_path(user, self._lookup_account(account, True)[1], path) - for k, v in self.default_policy.iteritems(): - if k not in policy: - policy[k] = v - self.policy.policy_set(path, policy) + node = self._put_path(user, self._lookup_account(account, True)[1], path) + self._put_policy(node, policy, True) @backend_method def delete_container(self, user, account, container, until=None): @@ -334,20 +384,15 @@ class ModularBackend(BaseBackend): path, node = self._lookup_container(account, container) if until is not None: - versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY) - for v in versions: - self.mapper.map_remv(v) + self.node.node_purge_children(node, until, CLUSTER_HISTORY) self.node.node_purge_children(node, until, CLUSTER_DELETED) return if self._get_statistics(node)[0] > 0: raise IndexError('Container is not empty') - versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY) - for v in versions: - self.mapper.map_remv(v) + self.node.node_purge_children(node, inf, CLUSTER_HISTORY) self.node.node_purge_children(node, inf, CLUSTER_DELETED) self.node.node_remove(node) - self.policy.policy_unset(path) @backend_method def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None): @@ -480,28 +525,32 @@ class ModularBackend(BaseBackend): self._can_read(user, account, container, name) path, node = self._lookup_object(account, container, name) props = self._get_version(node, version) - hashmap = self.mapper.map_retr(props[self.SERIAL]) + hashmap = self.mapper.map_retr(binascii.unhexlify(props[self.HASH])) return props[self.SIZE], [binascii.hexlify(x) for x in hashmap] - @backend_method - def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None): - """Create/update an object with the specified size and partial hashes.""" - - logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap) + def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None): if permissions is not None and user != account: raise NotAllowedError self._can_write(user, account, container, name) - missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap]) - if missing: - ie = IndexError() - ie.data = [binascii.hexlify(x) for x in missing] - raise ie if permissions is not None: path = '/'.join((account, container, name)) self._check_permissions(path, permissions) - path, node = self._put_object_node(account, container, name) - src_version_id, dest_version_id = self._copy_version(user, node, None, node, size) - self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap]) + + account_path, account_node = self._lookup_account(account, True) + container_path, container_node = self._lookup_container(account, container) + path, node = self._put_object_node(container_path, container_node, name) + src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash) + + # Check quota. + size_delta = size # Change with versioning. + if size_delta > 0: + account_quota = long(self._get_policy(account_node)['quota']) + container_quota = long(self._get_policy(container_node)['quota']) + if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \ + (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota): + # This must be executed in a transaction, so the version is never created if it fails. + raise QuotaError + if not replace_meta and src_version_id is not None: self.node.attribute_copy(src_version_id, dest_version_id) self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems())) @@ -509,41 +558,60 @@ class ModularBackend(BaseBackend): self.permissions.access_set(path, permissions) return dest_version_id - def _copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None): - if permissions is not None and user != account: - raise NotAllowedError - self._can_read(user, account, src_container, src_name) - self._can_write(user, account, dest_container, dest_name) - src_path, src_node = self._lookup_object(account, src_container, src_name) - self._get_version(src_node, src_version) - if permissions is not None: - dest_path = '/'.join((account, container, name)) - self._check_permissions(dest_path, permissions) - dest_path, dest_node = self._put_object_node(account, dest_container, dest_name) - src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node) - if src_version_id is not None: - self._copy_data(src_version_id, dest_version_id) - if not replace_meta and src_version_id is not None: + @backend_method + def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None): + """Create/update an object with the specified size and partial hashes.""" + + logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap) + if size == 0: # No such thing as an empty hashmap. + hashmap = [self.put_block('')] + map = HashMap(self.block_size, self.hash_algorithm) + map.extend([binascii.unhexlify(x) for x in hashmap]) + missing = self.blocker.block_ping(map) + if missing: + ie = IndexError() + ie.data = [binascii.hexlify(x) for x in missing] + raise ie + + hash = map.hash() + dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions) + self.mapper.map_stor(hash, map) + return dest_version_id + + def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None): + self._can_read(user, src_account, src_container, src_name) + path, node = self._lookup_object(src_account, src_container, src_name) + props = self._get_version(node, src_version) + src_version_id = props[self.SERIAL] + hash = props[self.HASH] + size = props[self.SIZE] + + if replace_meta: + meta = dest_meta + else: + meta = {} + dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions) + if not replace_meta: self.node.attribute_copy(src_version_id, dest_version_id) - self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems())) - if permissions is not None: - self.permissions.access_set(dest_path, permissions) + self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems())) return dest_version_id @backend_method - def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None): + def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None): """Copy an object's data and metadata.""" - logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version) - return self._copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version) + logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version) + return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version) @backend_method - def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None): + def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None): """Move an object's data and metadata.""" - logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions) - dest_version_id = self._copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None) - self._delete_object(user, account, src_container, src_name) + logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions) + if user != src_account: + raise NotAllowedError + dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None) + self._delete_object(user, src_account, src_container, src_name) return dest_version_id def _delete_object(self, user, account, container, name, until=None): @@ -555,10 +623,8 @@ class ModularBackend(BaseBackend): node = self.node.node_lookup(path) if node is None: return - versions = self.node.node_purge(node, until, CLUSTER_NORMAL) - versions += self.node.node_purge(node, until, CLUSTER_HISTORY) - for v in versions: - self.mapper.map_remv(v) + self.node.node_purge(node, until, CLUSTER_NORMAL) + self.node.node_purge(node, until, CLUSTER_HISTORY) self.node.node_purge_children(node, until, CLUSTER_DELETED) try: props = self._get_version(node) @@ -569,7 +635,7 @@ class ModularBackend(BaseBackend): return path, node = self._lookup_object(account, container, name) - self._copy_version(user, node, None, node, 0, CLUSTER_DELETED) + src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED) self.permissions.access_clear(path) @backend_method @@ -619,8 +685,7 @@ class ModularBackend(BaseBackend): # Path functions. - def _put_object_node(self, account, container, name): - path, parent = self._lookup_container(account, container) + def _put_object_node(self, path, parent, name): path = '/'.join((path, name)) node = self.node.node_lookup(path) if node is None: @@ -629,7 +694,7 @@ class ModularBackend(BaseBackend): def _put_path(self, user, parent, path): node = self.node.node_create(parent, path) - self.node.version_create(node, 0, None, user, CLUSTER_NORMAL) + self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL) return node def _lookup_account(self, account, create=True): @@ -680,54 +745,42 @@ class ModularBackend(BaseBackend): if props is None: raise NameError('Object does not exist') else: + try: + version = int(version) + except ValueError: + raise IndexError('Version does not exist') props = self.node.version_get_properties(version) if props is None or props[self.CLUSTER] == CLUSTER_DELETED: raise IndexError('Version does not exist') return props - def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL): + def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL): + """Create a new version of the node.""" - # Get source serial and size. - if src_version is not None: - src_props = self._get_version(src_node, src_version) - src_version_id = src_props[self.SERIAL] - size = src_props[self.SIZE] + props = self.node.version_lookup(node, inf, CLUSTER_NORMAL) + if props is not None: + src_version_id = props[self.SERIAL] + src_hash = props[self.HASH] + src_size = props[self.SIZE] else: - # Latest or create from scratch. - try: - src_props = self._get_version(src_node) - src_version_id = src_props[self.SERIAL] - size = src_props[self.SIZE] - except NameError: - src_version_id = None - size = 0 - if dest_size is not None: - size = dest_size + src_version_id = None + src_hash = None + src_size = 0 + if size is None: + hash = src_hash # This way hash can be set to None. + size = src_size - # Move the latest version at destination to CLUSTER_HISTORY and create new. - if src_node == dest_node and src_version is None and src_version_id is not None: + if src_version_id is not None: self.node.version_recluster(src_version_id, CLUSTER_HISTORY) - else: - dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL) - if dest_props is not None: - self.node.version_recluster(dest_props[self.SERIAL], CLUSTER_HISTORY) - dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster) - + dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster) return src_version_id, dest_version_id - def _copy_data(self, src_version, dest_version): - hashmap = self.mapper.map_retr(src_version) - self.mapper.map_stor(dest_version, hashmap) - - def _get_metadata(self, version): - if version is None: - return {} - return dict(self.node.attribute_get(version)) - - def _put_metadata(self, user, node, meta, replace=False, copy_data=True): + def _put_metadata(self, user, node, meta, replace=False): """Create a new version and store metadata.""" - src_version_id, dest_version_id = self._copy_version(user, node, None, node) + src_version_id, dest_version_id = self._put_version_duplicate(user, node) + + # TODO: Merge with other functions that update metadata... if not replace: if src_version_id is not None: self.node.attribute_copy(src_version_id, dest_version_id) @@ -735,8 +788,6 @@ class ModularBackend(BaseBackend): self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != '')) else: self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems())) - if copy_data and src_version_id is not None: - self._copy_data(src_version_id, dest_version_id) return dest_version_id def _list_limits(self, listing, marker, limit): @@ -782,6 +833,18 @@ class ModularBackend(BaseBackend): else: raise ValueError + def _put_policy(self, node, policy, replace): + if replace: + for k, v in self.default_policy.iteritems(): + if k not in policy: + policy[k] = v + self.node.policy_set(node, policy) + + def _get_policy(self, node): + policy = self.default_policy.copy() + policy.update(self.node.policy_get(node)) + return policy + # Access control functions. def _check_groups(self, groups):