import sys
import os
import time
-import sqlite3
import logging
import hashlib
import binascii
-from base import NotAllowedError, BaseBackend
+from base import NotAllowedError, QuotaError, BaseBackend
from lib.hashfiler import Mapper, Blocker
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
logger = logging.getLogger(__name__)
+
+class HashMap(list):
+
+ def __init__(self, blocksize, blockhash):
+ super(HashMap, self).__init__()
+ self.blocksize = blocksize
+ self.blockhash = blockhash
+
+ def _hash_raw(self, v):
+ h = hashlib.new(self.blockhash)
+ h.update(v)
+ return h.digest()
+
+ def hash(self):
+ if len(self) == 0:
+ return self._hash_raw('')
+ if len(self) == 1:
+ return self.__getitem__(0)
+
+ h = list(self)
+ s = 2
+ while s < len(h):
+ s = s * 2
+ h += [('\x00' * len(h[0]))] * (s - len(h))
+ while len(h) > 1:
+ h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
+ return h[0]
+
+
def backend_method(func=None, autocommit=1):
if func is None:
def fn(func):
self.hash_algorithm = 'sha256'
self.block_size = 4 * 1024 * 1024 # 4MB
- self.default_policy = {'quota': 0, 'versioning': 'auto'}
+ self.default_policy = {'quota': 0, 'versioning': 'manual'}
if path and not os.path.exists(path):
os.makedirs(path)
__import__(mod)
self.mod = sys.modules[mod]
+ self.db = db
self.wrapper = self.mod.dbwrapper.DBWrapper(db)
params = {'blocksize': self.block_size,
self.permissions = self.mod.permissions.Permissions(**params)
for x in ['READ', 'WRITE']:
setattr(self, x, getattr(self.mod.permissions, x))
- self.policy = self.mod.policy.Policy(**params)
self.node = self.mod.node.Node(**params)
- for x in ['ROOTNODE', 'SERIAL', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
+ for x in ['ROOTNODE', 'SERIAL', 'HASH', 'SIZE', 'MTIME', 'MUSER', 'CLUSTER']:
setattr(self, x, getattr(self.mod.node, x))
+ def close(self):
+ self.wrapper.close()
+
@backend_method
def list_accounts(self, user, marker=None, limit=10000):
"""Return a list of accounts the user can access."""
if user != account:
raise NotAllowedError
path, node = self._lookup_account(account, True)
- self._put_metadata(user, node, meta, replace, False)
+ self._put_metadata(user, node, meta, replace)
@backend_method
def get_account_groups(self, user, account):
self.permissions.group_addmany(account, k, v)
@backend_method
- def put_account(self, user, account):
+ def get_account_policy(self, user, account):
+ """Return a dictionary with the account policy."""
+
+ logger.debug("get_account_policy: %s", account)
+ if user != account:
+ if account not in self._allowed_accounts(user):
+ raise NotAllowedError
+ return {}
+ path, node = self._lookup_account(account, True)
+ return self._get_policy(node)
+
+ @backend_method
+ def update_account_policy(self, user, account, policy, replace=False):
+ """Update the policy associated with the account."""
+
+ logger.debug("update_account_policy: %s %s %s", account, policy, replace)
+ if user != account:
+ raise NotAllowedError
+ path, node = self._lookup_account(account, True)
+ self._check_policy(policy)
+ self._put_policy(node, policy, replace)
+
+ @backend_method
+ def put_account(self, user, account, policy={}):
"""Create a new account with the given name."""
- logger.debug("put_account: %s", account)
+ logger.debug("put_account: %s %s", account, policy)
if user != account:
raise NotAllowedError
node = self.node.node_lookup(account)
if node is not None:
raise NameError('Account already exists')
- self._put_path(user, self.ROOTNODE, account)
+ if policy:
+ self._check_policy(policy)
+ node = self._put_path(user, self.ROOTNODE, account)
+ self._put_policy(node, policy, True)
@backend_method
def delete_account(self, user, account):
if user != account:
raise NotAllowedError
path, node = self._lookup_container(account, container)
- self._put_metadata(user, node, meta, replace, False)
+ self._put_metadata(user, node, meta, replace)
@backend_method
def get_container_policy(self, user, account, container):
if container not in self._allowed_containers(user, account):
raise NotAllowedError
return {}
- path = self._lookup_container(account, container)[0]
- return self.policy.policy_get(path)
+ path, node = self._lookup_container(account, container)
+ return self._get_policy(node)
@backend_method
def update_container_policy(self, user, account, container, policy, replace=False):
- """Update the policy associated with the account."""
+ """Update the policy associated with the container."""
logger.debug("update_container_policy: %s %s %s %s", account, container, policy, replace)
if user != account:
raise NotAllowedError
- path = self._lookup_container(account, container)[0]
+ path, node = self._lookup_container(account, container)
self._check_policy(policy)
- if replace:
- for k, v in self.default_policy.iteritems():
- if k not in policy:
- policy[k] = v
- self.policy.policy_set(path, policy)
+ self._put_policy(node, policy, replace)
@backend_method
- def put_container(self, user, account, container, policy=None):
+ def put_container(self, user, account, container, policy={}):
"""Create a new container with the given name."""
logger.debug("put_container: %s %s %s", account, container, policy)
if policy:
self._check_policy(policy)
path = '/'.join((account, container))
- self._put_path(user, self._lookup_account(account, True)[1], path)
- for k, v in self.default_policy.iteritems():
- if k not in policy:
- policy[k] = v
- self.policy.policy_set(path, policy)
+ node = self._put_path(user, self._lookup_account(account, True)[1], path)
+ self._put_policy(node, policy, True)
@backend_method
def delete_container(self, user, account, container, until=None):
path, node = self._lookup_container(account, container)
if until is not None:
- versions = self.node.node_purge_children(node, until, CLUSTER_HISTORY)
- for v in versions:
- self.mapper.map_remv(v)
+ self.node.node_purge_children(node, until, CLUSTER_HISTORY)
self.node.node_purge_children(node, until, CLUSTER_DELETED)
return
if self._get_statistics(node)[0] > 0:
raise IndexError('Container is not empty')
- versions = self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
- for v in versions:
- self.mapper.map_remv(v)
+ self.node.node_purge_children(node, inf, CLUSTER_HISTORY)
self.node.node_purge_children(node, inf, CLUSTER_DELETED)
self.node.node_remove(node)
- self.policy.policy_unset(path)
@backend_method
def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
self._can_read(user, account, container, name)
path, node = self._lookup_object(account, container, name)
props = self._get_version(node, version)
- hashmap = self.mapper.map_retr(props[self.SERIAL])
+ hashmap = self.mapper.map_retr(binascii.unhexlify(props[self.HASH]))
return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
- @backend_method
- def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
- """Create/update an object with the specified size and partial hashes."""
-
- logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
+ def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
if permissions is not None and user != account:
raise NotAllowedError
self._can_write(user, account, container, name)
- missing = self.blocker.block_ping([binascii.unhexlify(x) for x in hashmap])
- if missing:
- ie = IndexError()
- ie.data = [binascii.hexlify(x) for x in missing]
- raise ie
if permissions is not None:
path = '/'.join((account, container, name))
self._check_permissions(path, permissions)
- path, node = self._put_object_node(account, container, name)
- src_version_id, dest_version_id = self._copy_version(user, node, None, node, size)
- self.mapper.map_stor(dest_version_id, [binascii.unhexlify(x) for x in hashmap])
+
+ account_path, account_node = self._lookup_account(account, True)
+ container_path, container_node = self._lookup_container(account, container)
+ path, node = self._put_object_node(container_path, container_node, name)
+ src_version_id, dest_version_id = self._put_version_duplicate(user, node, size, hash)
+
+ # Check quota.
+ size_delta = size # Change with versioning.
+ if size_delta > 0:
+ account_quota = long(self._get_policy(account_node)['quota'])
+ container_quota = long(self._get_policy(container_node)['quota'])
+ if (account_quota > 0 and self._get_statistics(account_node)[1] + size_delta > account_quota) or \
+ (container_quota > 0 and self._get_statistics(container_node)[1] + size_delta > container_quota):
+ # This must be executed in a transaction, so the version is never created if it fails.
+ raise QuotaError
+
if not replace_meta and src_version_id is not None:
self.node.attribute_copy(src_version_id, dest_version_id)
self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
self.permissions.access_set(path, permissions)
return dest_version_id
- def _copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
- if permissions is not None and user != account:
- raise NotAllowedError
- self._can_read(user, account, src_container, src_name)
- self._can_write(user, account, dest_container, dest_name)
- src_path, src_node = self._lookup_object(account, src_container, src_name)
- self._get_version(src_node, src_version)
- if permissions is not None:
- dest_path = '/'.join((account, container, name))
- self._check_permissions(dest_path, permissions)
- dest_path, dest_node = self._put_object_node(account, dest_container, dest_name)
- src_version_id, dest_version_id = self._copy_version(user, src_node, src_version, dest_node)
- if src_version_id is not None:
- self._copy_data(src_version_id, dest_version_id)
- if not replace_meta and src_version_id is not None:
+ @backend_method
+ def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
+ """Create/update an object with the specified size and partial hashes."""
+
+ logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
+ if size == 0: # No such thing as an empty hashmap.
+ hashmap = [self.put_block('')]
+ map = HashMap(self.block_size, self.hash_algorithm)
+ map.extend([binascii.unhexlify(x) for x in hashmap])
+ missing = self.blocker.block_ping(map)
+ if missing:
+ ie = IndexError()
+ ie.data = [binascii.hexlify(x) for x in missing]
+ raise ie
+
+ hash = map.hash()
+ dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
+ self.mapper.map_stor(hash, map)
+ return dest_version_id
+
+ def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+ self._can_read(user, src_account, src_container, src_name)
+ path, node = self._lookup_object(src_account, src_container, src_name)
+ props = self._get_version(node, src_version)
+ src_version_id = props[self.SERIAL]
+ hash = props[self.HASH]
+ size = props[self.SIZE]
+
+ if replace_meta:
+ meta = dest_meta
+ else:
+ meta = {}
+ dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
+ if not replace_meta:
self.node.attribute_copy(src_version_id, dest_version_id)
- self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
- if permissions is not None:
- self.permissions.access_set(dest_path, permissions)
+ self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
return dest_version_id
@backend_method
- def copy_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+ def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
"""Copy an object's data and metadata."""
- logger.debug("copy_object: %s %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
- return self._copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
+ logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
+ return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
@backend_method
- def move_object(self, user, account, src_container, src_name, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
+ def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
"""Move an object's data and metadata."""
- logger.debug("move_object: %s %s %s %s %s %s %s %s", account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions)
- dest_version_id = self._copy_object(user, account, src_container, src_name, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
- self._delete_object(user, account, src_container, src_name)
+ logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
+ if user != src_account:
+ raise NotAllowedError
+ dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
+ self._delete_object(user, src_account, src_container, src_name)
return dest_version_id
def _delete_object(self, user, account, container, name, until=None):
node = self.node.node_lookup(path)
if node is None:
return
- versions = self.node.node_purge(node, until, CLUSTER_NORMAL)
- versions += self.node.node_purge(node, until, CLUSTER_HISTORY)
- for v in versions:
- self.mapper.map_remv(v)
+ self.node.node_purge(node, until, CLUSTER_NORMAL)
+ self.node.node_purge(node, until, CLUSTER_HISTORY)
self.node.node_purge_children(node, until, CLUSTER_DELETED)
try:
props = self._get_version(node)
return
path, node = self._lookup_object(account, container, name)
- self._copy_version(user, node, None, node, 0, CLUSTER_DELETED)
+ src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
self.permissions.access_clear(path)
@backend_method
# Path functions.
- def _put_object_node(self, account, container, name):
- path, parent = self._lookup_container(account, container)
+ def _put_object_node(self, path, parent, name):
path = '/'.join((path, name))
node = self.node.node_lookup(path)
if node is None:
def _put_path(self, user, parent, path):
node = self.node.node_create(parent, path)
- self.node.version_create(node, 0, None, user, CLUSTER_NORMAL)
+ self.node.version_create(node, None, 0, None, user, CLUSTER_NORMAL)
return node
def _lookup_account(self, account, create=True):
if props is None:
raise NameError('Object does not exist')
else:
+ try:
+ version = int(version)
+ except ValueError:
+ raise IndexError('Version does not exist')
props = self.node.version_get_properties(version)
if props is None or props[self.CLUSTER] == CLUSTER_DELETED:
raise IndexError('Version does not exist')
return props
- def _copy_version(self, user, src_node, src_version, dest_node, dest_size=None, dest_cluster=CLUSTER_NORMAL):
+ def _put_version_duplicate(self, user, node, size=None, hash=None, cluster=CLUSTER_NORMAL):
+ """Create a new version of the node."""
- # Get source serial and size.
- if src_version is not None:
- src_props = self._get_version(src_node, src_version)
- src_version_id = src_props[self.SERIAL]
- size = src_props[self.SIZE]
+ props = self.node.version_lookup(node, inf, CLUSTER_NORMAL)
+ if props is not None:
+ src_version_id = props[self.SERIAL]
+ src_hash = props[self.HASH]
+ src_size = props[self.SIZE]
else:
- # Latest or create from scratch.
- try:
- src_props = self._get_version(src_node)
- src_version_id = src_props[self.SERIAL]
- size = src_props[self.SIZE]
- except NameError:
- src_version_id = None
- size = 0
- if dest_size is not None:
- size = dest_size
+ src_version_id = None
+ src_hash = None
+ src_size = 0
+ if size is None:
+ hash = src_hash # This way hash can be set to None.
+ size = src_size
- # Move the latest version at destination to CLUSTER_HISTORY and create new.
- if src_node == dest_node and src_version is None and src_version_id is not None:
+ if src_version_id is not None:
self.node.version_recluster(src_version_id, CLUSTER_HISTORY)
- else:
- dest_props = self.node.version_lookup(dest_node, inf, CLUSTER_NORMAL)
- if dest_props is not None:
- self.node.version_recluster(dest_props[self.SERIAL], CLUSTER_HISTORY)
- dest_version_id, mtime = self.node.version_create(dest_node, size, src_version_id, user, dest_cluster)
-
+ dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
return src_version_id, dest_version_id
- def _copy_data(self, src_version, dest_version):
- hashmap = self.mapper.map_retr(src_version)
- self.mapper.map_stor(dest_version, hashmap)
-
- def _get_metadata(self, version):
- if version is None:
- return {}
- return dict(self.node.attribute_get(version))
-
- def _put_metadata(self, user, node, meta, replace=False, copy_data=True):
+ def _put_metadata(self, user, node, meta, replace=False):
"""Create a new version and store metadata."""
- src_version_id, dest_version_id = self._copy_version(user, node, None, node)
+ src_version_id, dest_version_id = self._put_version_duplicate(user, node)
+
+ # TODO: Merge with other functions that update metadata...
if not replace:
if src_version_id is not None:
self.node.attribute_copy(src_version_id, dest_version_id)
self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
else:
self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
- if copy_data and src_version_id is not None:
- self._copy_data(src_version_id, dest_version_id)
return dest_version_id
def _list_limits(self, listing, marker, limit):
else:
raise ValueError
+ def _put_policy(self, node, policy, replace):
+ if replace:
+ for k, v in self.default_policy.iteritems():
+ if k not in policy:
+ policy[k] = v
+ self.node.policy_set(node, policy)
+
+ def _get_policy(self, node):
+ policy = self.default_policy.copy()
+ policy.update(self.node.policy_get(node))
+ return policy
+
# Access control functions.
def _check_groups(self, groups):