import os
import time
import logging
-import hashlib
import binascii
from base import NotAllowedError, QuotaError, BaseBackend
+from pithos.lib.hashmap import HashMap
+
( CLUSTER_NORMAL, CLUSTER_HISTORY, CLUSTER_DELETED ) = range(3)
inf = float('inf')
-
-logger = logging.getLogger(__name__)
+ULTIMATE_ANSWER = 42
-class HashMap(list):
-
- def __init__(self, blocksize, blockhash):
- super(HashMap, self).__init__()
- self.blocksize = blocksize
- self.blockhash = blockhash
-
- def _hash_raw(self, v):
- h = hashlib.new(self.blockhash)
- h.update(v)
- return h.digest()
-
- def hash(self):
- if len(self) == 0:
- return self._hash_raw('')
- if len(self) == 1:
- return self.__getitem__(0)
-
- h = list(self)
- s = 2
- while s < len(h):
- s = s * 2
- h += [('\x00' * len(h[0]))] * (s - len(h))
- while len(h) > 1:
- h = [self._hash_raw(h[x] + h[x + 1]) for x in range(0, len(h), 2)]
- return h[0]
+logger = logging.getLogger(__name__)
def backend_method(func=None, autocommit=1):
"""
def __init__(self, db_module, db_connection, block_module, block_path):
+ db_module = db_module or 'pithos.backends.lib.sqlalchemy'
+ block_module = block_module or 'pithos.backends.lib.hashfiler'
+
self.hash_algorithm = 'sha256'
self.block_size = 4 * 1024 * 1024 # 4MB
- self.default_policy = {'quota': 0, 'versioning': 'manual'}
+ self.default_policy = {'quota': 0, 'versioning': 'auto'}
__import__(db_module)
self.db_module = sys.modules[db_module]
return allowed[start:start + limit]
@backend_method
- def get_account_meta(self, user, account, until=None):
- """Return a dictionary with the account metadata."""
+ def get_account_meta(self, user, account, domain, until=None):
+ """Return a dictionary with the account metadata for the domain."""
- logger.debug("get_account_meta: %s %s", account, until)
+ logger.debug("get_account_meta: %s %s %s", account, domain, until)
path, node = self._lookup_account(account, user == account)
if user != account:
if until or node is None or account not in self._allowed_accounts(user):
else:
meta = {}
if props is not None:
- meta.update(dict(self.node.attribute_get(props[self.SERIAL])))
+ meta.update(dict(self.node.attribute_get(props[self.SERIAL], domain)))
if until is not None:
meta.update({'until_timestamp': tstamp})
meta.update({'name': account, 'count': count, 'bytes': bytes})
return meta
@backend_method
- def update_account_meta(self, user, account, meta, replace=False):
- """Update the metadata associated with the account."""
+ def update_account_meta(self, user, account, domain, meta, replace=False):
+ """Update the metadata associated with the account for the domain."""
- logger.debug("update_account_meta: %s %s %s", account, meta, replace)
+ logger.debug("update_account_meta: %s %s %s %s", account, domain, meta, replace)
if user != account:
raise NotAllowedError
path, node = self._lookup_account(account, True)
- self._put_metadata(user, node, meta, replace)
+ self._put_metadata(user, node, domain, meta, replace)
@backend_method
def get_account_groups(self, user, account):
start, limit = self._list_limits(allowed, marker, limit)
return allowed[start:start + limit]
node = self.node.node_lookup(account)
- return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, [], until)]
+ return [x[0] for x in self._list_objects(node, account, '', '/', marker, limit, False, None, [], until)]
@backend_method
- def get_container_meta(self, user, account, container, until=None):
- """Return a dictionary with the container metadata."""
+ def get_container_meta(self, user, account, container, domain, until=None):
+ """Return a dictionary with the container metadata for the domain."""
- logger.debug("get_container_meta: %s %s %s", account, container, until)
+ logger.debug("get_container_meta: %s %s %s %s", account, container, domain, until)
if user != account:
if until or container not in self._allowed_containers(user, account):
raise NotAllowedError
if user != account:
meta = {'name': container}
else:
- meta = dict(self.node.attribute_get(props[self.SERIAL]))
+ meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
if until is not None:
meta.update({'until_timestamp': tstamp})
meta.update({'name': container, 'count': count, 'bytes': bytes})
return meta
@backend_method
- def update_container_meta(self, user, account, container, meta, replace=False):
- """Update the metadata associated with the container."""
+ def update_container_meta(self, user, account, container, domain, meta, replace=False):
+ """Update the metadata associated with the container for the domain."""
- logger.debug("update_container_meta: %s %s %s %s", account, container, meta, replace)
+ logger.debug("update_container_meta: %s %s %s %s %s", account, container, domain, meta, replace)
if user != account:
raise NotAllowedError
path, node = self._lookup_container(account, container)
- self._put_metadata(user, node, meta, replace)
+ self._put_metadata(user, node, domain, meta, replace)
@backend_method
def get_container_policy(self, user, account, container):
self.node.node_remove(node)
@backend_method
- def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], shared=False, until=None):
+ def list_objects(self, user, account, container, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], shared=False, until=None):
"""Return a list of objects existing under a container."""
- logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, keys, shared, until)
+ logger.debug("list_objects: %s %s %s %s %s %s %s %s %s %s %s", account, container, prefix, delimiter, marker, limit, virtual, domain, keys, shared, until)
allowed = []
if user != account:
if until:
if not allowed:
return []
path, node = self._lookup_container(account, container)
- return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, keys, until, allowed)
+ return self._list_objects(node, path, prefix, delimiter, marker, limit, virtual, domain, keys, until, allowed)
@backend_method
- def list_object_meta(self, user, account, container, until=None):
- """Return a list with all the container's object meta keys."""
+ def list_object_meta(self, user, account, container, domain, until=None):
+ """Return a list with all the container's object meta keys for the domain."""
- logger.debug("list_object_meta: %s %s %s", account, container, until)
+ logger.debug("list_object_meta: %s %s %s %s", account, container, domain, until)
allowed = []
if user != account:
if until:
raise NotAllowedError
path, node = self._lookup_container(account, container)
before = until if until is not None else inf
- return self.node.latest_attribute_keys(node, before, CLUSTER_DELETED, allowed)
+ return self.node.latest_attribute_keys(node, domain, before, CLUSTER_DELETED, allowed)
@backend_method
- def get_object_meta(self, user, account, container, name, version=None):
- """Return a dictionary with the object metadata."""
+ def get_object_meta(self, user, account, container, name, domain, version=None):
+ """Return a dictionary with the object metadata for the domain."""
- logger.debug("get_object_meta: %s %s %s %s", account, container, name, version)
+ logger.debug("get_object_meta: %s %s %s %s %s", account, container, name, domain, version)
self._can_read(user, account, container, name)
path, node = self._lookup_object(account, container, name)
props = self._get_version(node, version)
raise NameError('Object does not exist')
modified = del_props[self.MTIME]
- meta = dict(self.node.attribute_get(props[self.SERIAL]))
+ meta = dict(self.node.attribute_get(props[self.SERIAL], domain))
meta.update({'name': name, 'bytes': props[self.SIZE], 'hash':props[self.HASH]})
meta.update({'version': props[self.SERIAL], 'version_timestamp': props[self.MTIME]})
meta.update({'modified': modified, 'modified_by': props[self.MUSER]})
return meta
@backend_method
- def update_object_meta(self, user, account, container, name, meta, replace=False):
- """Update the metadata associated with the object."""
+ def update_object_meta(self, user, account, container, name, domain, meta, replace=False):
+ """Update the metadata associated with the object for the domain and return the new version."""
- logger.debug("update_object_meta: %s %s %s %s %s", account, container, name, meta, replace)
+ logger.debug("update_object_meta: %s %s %s %s %s %s", account, container, name, domain, meta, replace)
self._can_write(user, account, container, name)
path, node = self._lookup_object(account, container, name)
- return self._put_metadata(user, node, meta, replace)
+ src_version_id, dest_version_id = self._put_metadata(user, node, domain, meta, replace)
+ self._apply_versioning(account, container, src_version_id)
+ return dest_version_id
@backend_method
def get_object_permissions(self, user, account, container, name):
@backend_method
def get_object_public(self, user, account, container, name):
- """Return the public URL of the object if applicable."""
+ """Return the public id of the object if applicable."""
logger.debug("get_object_public: %s %s %s", account, container, name)
self._can_read(user, account, container, name)
path = self._lookup_object(account, container, name)[0]
- if self.permissions.public_check(path):
- return '/public/' + path
- return None
+ p = self.permissions.public_get(path)
+ if p is not None:
+ p += ULTIMATE_ANSWER
+ return p
@backend_method
def update_object_public(self, user, account, container, name, public):
hashmap = self.store.map_get(binascii.unhexlify(props[self.HASH]))
return props[self.SIZE], [binascii.hexlify(x) for x in hashmap]
- def _update_object_hash(self, user, account, container, name, size, hash, meta={}, replace_meta=False, permissions=None):
+ def _update_object_hash(self, user, account, container, name, size, hash, permissions=None):
if permissions is not None and user != account:
raise NotAllowedError
self._can_write(user, account, container, name)
# This must be executed in a transaction, so the version is never created if it fails.
raise QuotaError
- if not replace_meta and src_version_id is not None:
- self.node.attribute_copy(src_version_id, dest_version_id)
- self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
if permissions is not None:
self.permissions.access_set(path, permissions)
- return dest_version_id
+ self._apply_versioning(account, container, src_version_id)
+ return src_version_id, dest_version_id
@backend_method
- def update_object_hashmap(self, user, account, container, name, size, hashmap, meta={}, replace_meta=False, permissions=None):
+ def update_object_hashmap(self, user, account, container, name, size, hashmap, domain, meta={}, replace_meta=False, permissions=None):
"""Create/update an object with the specified size and partial hashes."""
logger.debug("update_object_hashmap: %s %s %s %s %s", account, container, name, size, hashmap)
raise ie
hash = map.hash()
- dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), meta, replace_meta, permissions)
+ src_version_id, dest_version_id = self._update_object_hash(user, account, container, name, size, binascii.hexlify(hash), permissions)
+ self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace_meta)
self.store.map_put(hash, map)
return dest_version_id
- def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+ def _copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_domain=None, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
self._can_read(user, src_account, src_container, src_name)
path, node = self._lookup_object(src_account, src_container, src_name)
props = self._get_version(node, src_version)
hash = props[self.HASH]
size = props[self.SIZE]
- if replace_meta:
- meta = dest_meta
- else:
- meta = {}
- dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, meta, True, permissions)
- if not replace_meta:
- self.node.attribute_copy(src_version_id, dest_version_id)
- self.node.attribute_set(dest_version_id, ((k, v) for k, v in dest_meta.iteritems()))
+ src_v_id, dest_version_id = self._update_object_hash(user, dest_account, dest_container, dest_name, size, hash, permissions)
+ self._put_metadata_duplicate(src_version_id, dest_version_id, dest_domain, dest_meta, replace_meta)
return dest_version_id
@backend_method
- def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None, src_version=None):
+ def copy_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None, src_version=None):
"""Copy an object's data and metadata."""
- logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
- return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, src_version)
+ logger.debug("copy_object: %s %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
+ return self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, src_version)
@backend_method
- def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta={}, replace_meta=False, permissions=None):
+ def move_object(self, user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta={}, replace_meta=False, permissions=None):
"""Move an object's data and metadata."""
- logger.debug("move_object: %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions)
+ logger.debug("move_object: %s %s %s %s %s %s %s %s %s %s", src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions)
if user != src_account:
raise NotAllowedError
- dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, dest_meta, replace_meta, permissions, None)
- self._delete_object(user, src_account, src_container, src_name)
+ dest_version_id = self._copy_object(user, src_account, src_container, src_name, dest_account, dest_container, dest_name, domain, meta, replace_meta, permissions, None)
+ if (src_account, src_container, src_name) != (dest_account, dest_container, dest_name):
+ self._delete_object(user, src_account, src_container, src_name)
return dest_version_id
def _delete_object(self, user, account, container, name, until=None):
path, node = self._lookup_object(account, container, name)
src_version_id, dest_version_id = self._put_version_duplicate(user, node, 0, None, CLUSTER_DELETED)
+ self._apply_versioning(account, container, src_version_id)
self.permissions.access_clear(path)
@backend_method
versions = self.node.node_get_versions(node)
return [[x[self.SERIAL], x[self.MTIME]] for x in versions if x[self.CLUSTER] != CLUSTER_DELETED]
+ @backend_method
+ def get_public(self, user, public):
+ """Return the (account, container, name) for the public id given."""
+ logger.debug("get_public: %s", public)
+ if public is None or public < ULTIMATE_ANSWER:
+ raise NameError
+ path = self.permissions.public_path(public - ULTIMATE_ANSWER)
+ account, container, name = path.split('/', 2)
+ self._can_read(user, account, container, name)
+ return (account, container, name)
+
@backend_method(autocommit=0)
def get_block(self, hash):
"""Return a block's data."""
dest_version_id, mtime = self.node.version_create(node, hash, size, src_version_id, user, cluster)
return src_version_id, dest_version_id
- def _put_metadata(self, user, node, meta, replace=False):
+ def _put_metadata_duplicate(self, src_version_id, dest_version_id, domain, meta, replace=False):
+ if src_version_id is not None:
+ self.node.attribute_copy(src_version_id, dest_version_id)
+ if not replace:
+ self.node.attribute_del(dest_version_id, domain, (k for k, v in meta.iteritems() if v == ''))
+ self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems() if v != ''))
+ else:
+ self.node.attribute_del(dest_version_id, domain)
+ self.node.attribute_set(dest_version_id, domain, ((k, v) for k, v in meta.iteritems()))
+
+ def _put_metadata(self, user, node, domain, meta, replace=False):
"""Create a new version and store metadata."""
src_version_id, dest_version_id = self._put_version_duplicate(user, node)
-
- # TODO: Merge with other functions that update metadata...
- if not replace:
- if src_version_id is not None:
- self.node.attribute_copy(src_version_id, dest_version_id)
- self.node.attribute_del(dest_version_id, (k for k, v in meta.iteritems() if v == ''))
- self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems() if v != ''))
- else:
- self.node.attribute_set(dest_version_id, ((k, v) for k, v in meta.iteritems()))
- return dest_version_id
+ self._put_metadata_duplicate(src_version_id, dest_version_id, domain, meta, replace)
+ return src_version_id, dest_version_id
def _list_limits(self, listing, marker, limit):
start = 0
limit = 10000
return start, limit
- def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, keys=[], until=None, allowed=[]):
+ def _list_objects(self, parent, path, prefix='', delimiter=None, marker=None, limit=10000, virtual=True, domain=None, keys=[], until=None, allowed=[]):
cont_prefix = path + '/'
prefix = cont_prefix + prefix
start = cont_prefix + marker if marker else None
before = until if until is not None else inf
- filterq = ','.join(keys) if keys else None
+ filterq = keys if domain else []
- objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, filterq)
+ objects, prefixes = self.node.latest_version_list(parent, prefix, delimiter, start, limit, before, CLUSTER_DELETED, allowed, domain, filterq)
objects.extend([(p, None) for p in prefixes] if virtual else [])
objects.sort(key=lambda x: x[0])
objects = [(x[0][len(cont_prefix):], x[1]) for x in objects]
if q < 0:
raise ValueError
elif k == 'versioning':
- if v not in ['auto', 'manual', 'none']:
+ if v not in ['auto', 'none']:
raise ValueError
else:
raise ValueError
policy.update(self.node.policy_get(node))
return policy
+ def _apply_versioning(self, account, container, version_id):
+ if version_id is None:
+ return
+ path, node = self._lookup_container(account, container)
+ versioning = self._get_policy(node)['versioning']
+ if versioning != 'auto':
+ hash = self.node.version_remove(version_id)
+ self.store.map_delete(hash)
+
# Access control functions.
def _check_groups(self, groups):